`deribit.feed`: fix "trade" event streaming
The main change needed to make `piker.data.feed._FeedsBus` work was to correctly format the `'trade'` msgs with the (new schema) expected `'ticks': list[dict]` field which, - we compute the `piker` quote-msg-`dict` from the (now directly proxied through) `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`. - similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()` and passed to the `cryptofeed.FeedHandler`) and instead mod the callback to simply pass through the `.types.L1Book` ref directly to the `piker`/`trio` side task for conversion. In support of all that, - mask-to-drop the alt-branch to wait on a first rt event when the `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't seem like this ever even happens? - add a buncha typing, comments and doc-strs to the routines in `.deribit.api` including notes on where we can choose to mod the `.bs_fqme` for our eventually preferred `piker` style format. - simplify some nested `@acm` enters to the new single `async with <tuple>)` style. - be particularly pedantic about typing `tractor.to_asyncio.LinkedTaskChannel` - bit of pep8 line-spacing fixes in `.venues`.
							parent
							
								
									149e03813a
								
							
						
					
					
						commit
						ae6e5f5994
					
				|  | @ -55,9 +55,10 @@ from cryptofeed.defines import ( | |||
|     OPTION, CALL, PUT | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| 
 | ||||
| # types for managing the cb callbacks. | ||||
| # from cryptofeed.types import L1Book | ||||
| from cryptofeed.types import ( | ||||
|     L1Book, | ||||
|     Trade, | ||||
| ) | ||||
| from piker.brokers import SymbolNotFound | ||||
| from .venues import ( | ||||
|     _ws_url, | ||||
|  | @ -66,9 +67,7 @@ from .venues import ( | |||
|     Pair, | ||||
|     OptionPair, | ||||
|     JSONRPCResult, | ||||
|     # JSONRPCChannel, | ||||
|     KLinesResult, | ||||
|     # Trade, | ||||
|     LastTradesResult, | ||||
| ) | ||||
| from piker.accounting import ( | ||||
|  | @ -98,9 +97,17 @@ _spawn_kwargs = { | |||
| } | ||||
| 
 | ||||
| 
 | ||||
| # convert datetime obj timestamp to unixtime in milliseconds | ||||
| def deribit_timestamp(when) -> int: | ||||
|     return int((when.timestamp() * 1000) + (when.microsecond / 1000)) | ||||
| def deribit_timestamp(when: datetime) -> int: | ||||
|     ''' | ||||
|     Convert conventional epoch timestamp, in secs, to unixtime in | ||||
|     milliseconds. | ||||
| 
 | ||||
|     ''' | ||||
|     return int( | ||||
|         (when.timestamp() * 1000) | ||||
|         + | ||||
|         (when.microsecond / 1000) | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def str_to_cb_sym(name: str) -> Symbol: | ||||
|  | @ -155,11 +162,28 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def cb_sym_to_deribit_inst(sym: Symbol): | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) | ||||
|     otype = 'C' if sym.option_type == CALL else 'P' | ||||
| # TODO, instead can't we just lookup the `MktPair` directly | ||||
| # and pass it upward to `stream_quotes()`?? | ||||
| def cb_sym_to_deribit_inst(sym: Symbol) -> str: | ||||
|     ''' | ||||
|     Generate our own internal `str`-repr for a `cryptofeed.Symbol` | ||||
|     uniquely from its fields. | ||||
| 
 | ||||
|     return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' | ||||
|     This is the equiv of generating a `Pair.fmqe` from `cryptofeed` | ||||
|     for now i suppose..? | ||||
| 
 | ||||
|     ''' | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) | ||||
|     otype = ( | ||||
|         'C' if sym.option_type == CALL | ||||
|         else 'P' | ||||
|     ) | ||||
|     return ( | ||||
|         f'{sym.base}-' | ||||
|         f'{new_expiry_date}-' | ||||
|         f'{sym.strike_price}-' | ||||
|         f'{otype}' | ||||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| def get_values_from_cb_normalized_date(expiry_date: str) -> str: | ||||
|  | @ -598,7 +622,7 @@ async def get_client( | |||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_feed_handler(): | ||||
| async def open_feed_handler() -> FeedHandler: | ||||
|     fh = FeedHandler(config=get_config()) | ||||
|     yield fh | ||||
|     await to_asyncio.run_task(fh.stop_async) | ||||
|  | @ -619,59 +643,37 @@ async def aio_price_feed_relay( | |||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Relay price feed quotes from the `cryptofeed.FeedHandler` to | ||||
|     the `piker`-side `trio.task` consumers for delivery to consumer | ||||
|     sub-actors for various subsystems. | ||||
| 
 | ||||
|     ''' | ||||
|     async def _trade( | ||||
|         data: dict, | ||||
|         trade: Trade,  # cryptofeed, NOT ours from `.venues`! | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         ''' | ||||
|         Send `cryptofeed.FeedHandler` quotes to `piker`-side | ||||
|         `trio.Task`. | ||||
|         Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. | ||||
| 
 | ||||
|         ''' | ||||
|         to_trio.send_nowait(( | ||||
|             'trade', { | ||||
|                 'symbol': cb_sym_to_deribit_inst( | ||||
|                     str_to_cb_sym(data.symbol)).lower(), | ||||
|                 'last': data, | ||||
|                 'broker_ts': time.time(), | ||||
|                 'data': data.to_dict(), | ||||
|                 'receipt': receipt_timestamp, | ||||
|             }, | ||||
|         )) | ||||
|         to_trio.send_nowait(('trade', trade)) | ||||
| 
 | ||||
|     async def _l1( | ||||
|         data: dict, | ||||
|         book: L1Book, | ||||
|         receipt_timestamp: int, | ||||
|     ) -> None: | ||||
|         to_trio.send_nowait(( | ||||
|             'l1', { | ||||
|                'symbol': cb_sym_to_deribit_inst( | ||||
|                     str_to_cb_sym(data.symbol)).lower(), | ||||
|                 'ticks': [ | ||||
|                     { | ||||
|                         'type': 'bid', | ||||
|                         'price': float(data.bid_price), | ||||
|                         'size': float(data.bid_size) | ||||
|                     }, | ||||
|                     { | ||||
|                         'type': 'bsize', | ||||
|                         'price': float(data.bid_price), | ||||
|                         'size': float(data.bid_size) | ||||
|                     }, | ||||
|                     { | ||||
|                         'type': 'ask', | ||||
|                         'price': float(data.ask_price), | ||||
|                         'size': float(data.ask_size) | ||||
|                     }, | ||||
|                     { | ||||
|                         'type': 'asize', | ||||
|                         'price': float(data.ask_price), | ||||
|                         'size': float(data.ask_size) | ||||
|                     } | ||||
|                 ] | ||||
|             }, | ||||
|         )) | ||||
|         ''' | ||||
|         Relay-thru "l1 book" updates. | ||||
| 
 | ||||
|         ''' | ||||
| 
 | ||||
|         to_trio.send_nowait(('l1', book)) | ||||
| 
 | ||||
|         # TODO, make this work! | ||||
|         # -[ ] why isn't this working in `tractor.pause_from_sync()`?? | ||||
|         # breakpoint() | ||||
| 
 | ||||
|     sym: Symbol = piker_sym_to_cb_sym(instrument) | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|  | @ -685,27 +687,35 @@ async def aio_price_feed_relay( | |||
|     if not fh.running: | ||||
|         fh.run( | ||||
|             start_loop=False, | ||||
|             install_signal_handlers=False) | ||||
|             install_signal_handlers=False | ||||
|         ) | ||||
| 
 | ||||
|     # sync with trio | ||||
|     to_trio.send_nowait(None) | ||||
| 
 | ||||
|     # run until cancelled | ||||
|     await asyncio.sleep(float('inf')) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
| async def open_price_feed( | ||||
|     instrument: str | ||||
| ) -> trio.abc.ReceiveStream: | ||||
|     async with maybe_open_feed_handler() as fh: | ||||
|         async with to_asyncio.open_channel_from( | ||||
| ) -> to_asyncio.LinkedTaskChannel: | ||||
| 
 | ||||
|     fh: FeedHandler | ||||
|     first: None | ||||
|     chan: to_asyncio.LinkedTaskChannel | ||||
|     async with ( | ||||
|         maybe_open_feed_handler() as fh, | ||||
|         to_asyncio.open_channel_from( | ||||
|             partial( | ||||
|                 aio_price_feed_relay, | ||||
|                 fh, | ||||
|                 instrument | ||||
|             ) | ||||
|         ) as (first, chan): | ||||
|             yield chan | ||||
|         ) as (first, chan) | ||||
|     ): | ||||
|         yield chan | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  | @ -714,6 +724,7 @@ async def maybe_open_price_feed( | |||
| ) -> trio.abc.ReceiveStream: | ||||
| 
 | ||||
|     # TODO: add a predicate to maybe_open_context | ||||
|     feed: to_asyncio.LinkedTaskChannel | ||||
|     async with maybe_open_context( | ||||
|         acm_func=open_price_feed, | ||||
|         kwargs={ | ||||
|  |  | |||
|  | @ -63,6 +63,7 @@ from .api import ( | |||
|     # get_config, | ||||
|     piker_sym_to_cb_sym, | ||||
|     cb_sym_to_deribit_inst, | ||||
|     str_to_cb_sym, | ||||
|     maybe_open_price_feed | ||||
| ) | ||||
| from .venues import ( | ||||
|  | @ -237,13 +238,19 @@ async def stream_quotes( | |||
|     ''' | ||||
|     Open a live quote stream for the market set defined by `symbols`. | ||||
| 
 | ||||
|     Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side | ||||
|     task and relays through L1 and `Trade` msgs here to our `trio.Task`. | ||||
| 
 | ||||
|     ''' | ||||
|     sym = symbols[0].split('.')[0] | ||||
|     init_msgs: list[FeedInit] = [] | ||||
| 
 | ||||
|     # multiline nested `dict` formatter (since rn quote-msgs are | ||||
|     # just that). | ||||
|     pfmt: Callable[[str], str] = mk_repr() | ||||
|     pfmt: Callable[[str], str] = mk_repr( | ||||
|         # so we can see `deribit`'s delightfully mega-long bs fields.. | ||||
|         maxstring=100, | ||||
|     ) | ||||
| 
 | ||||
|     async with ( | ||||
|         open_cached_client('deribit') as client, | ||||
|  | @ -262,25 +269,31 @@ async def stream_quotes( | |||
|         # build `cryptofeed` feed-handle | ||||
|         cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) | ||||
| 
 | ||||
|         async with maybe_open_price_feed(sym) as stream: | ||||
|             last_trades = ( | ||||
|                 await client.last_trades( | ||||
|                     cb_sym_to_deribit_inst(cf_sym), | ||||
|                     count=1, | ||||
|                 ) | ||||
|             ).trades | ||||
|         from_cf: tractor.to_asyncio.LinkedTaskChannel | ||||
|         async with maybe_open_price_feed(sym) as from_cf: | ||||
| 
 | ||||
|             if len(last_trades) == 0: | ||||
|                 last_trade = None | ||||
|                 async for typ, quote in stream: | ||||
|                     if typ == 'trade': | ||||
|                         last_trade = Trade(**(quote['data'])) | ||||
|                         break | ||||
|             # load the "last trades" summary | ||||
|             last_trades_res: cryptofeed.LastTradesResult = await client.last_trades( | ||||
|                 cb_sym_to_deribit_inst(cf_sym), | ||||
|                 count=1, | ||||
|             ) | ||||
|             last_trades: list[Trade] = last_trades_res.trades | ||||
| 
 | ||||
|             else: | ||||
|                 last_trade = Trade(**(last_trades[0])) | ||||
|             # TODO, do we even need this or will the above always | ||||
|             # work? | ||||
|             # if not last_trades: | ||||
|             #     await tractor.pause() | ||||
|             #     async for typ, quote in from_cf: | ||||
|             #         if typ == 'trade': | ||||
|             #             last_trade = Trade(**(quote['data'])) | ||||
|             #             break | ||||
| 
 | ||||
|             first_quote = { | ||||
|             # else: | ||||
|             last_trade = Trade( | ||||
|                 **(last_trades[0]) | ||||
|             ) | ||||
| 
 | ||||
|             first_quote: dict = { | ||||
|                 'symbol': sym, | ||||
|                 'last': last_trade.price, | ||||
|                 'brokerd_ts': last_trade.timestamp, | ||||
|  | @ -305,14 +318,69 @@ async def stream_quotes( | |||
|             topic: str = mkt.bs_fqme | ||||
| 
 | ||||
|             # deliver until cancelled | ||||
|             async for typ, quote in stream: | ||||
|                 sym: str = quote['symbol'] | ||||
|                 log.info( | ||||
|                     f'deribit {typ!r} quote for {sym!r}\n\n' | ||||
|                     f'{pfmt(quote)}\n' | ||||
|                 ) | ||||
|             async for typ, ref in from_cf: | ||||
|                 match typ: | ||||
|                     case 'trade': | ||||
|                         trade: cryptofeed.types.Trade = ref | ||||
| 
 | ||||
|                         # TODO, re-impl this according to teh ideal | ||||
|                         # fqme for opts that we choose!! | ||||
|                         bs_fqme: str = cb_sym_to_deribit_inst( | ||||
|                             str_to_cb_sym(trade.symbol) | ||||
|                         ).lower() | ||||
| 
 | ||||
|                         piker_quote: dict = { | ||||
|                             'symbol': bs_fqme, | ||||
|                             'last': trade.price, | ||||
|                             'broker_ts': time.time(), | ||||
|                             # ^TODO, name this `brokerd/datad_ts` and | ||||
|                             # use `time.time_ns()` ?? | ||||
|                             'ticks': [{ | ||||
|                                 'type': 'trade', | ||||
|                                 'price': float(trade.price), | ||||
|                                 'size': float(trade.amount), | ||||
|                                 'broker_ts': trade.timestamp, | ||||
|                             }], | ||||
|                         } | ||||
|                         log.info( | ||||
|                             f'deribit {typ!r} quote for {sym!r}\n\n' | ||||
|                             f'{trade}\n\n' | ||||
|                             f'{pfmt(piker_quote)}\n' | ||||
|                         ) | ||||
| 
 | ||||
|                     case 'l1': | ||||
|                         book: cryptofeed.types.L1Book = ref | ||||
| 
 | ||||
|                         # TODO, so this is where we can possibly change things | ||||
|                         # and instead lever the `MktPair.bs_fqme: str` output? | ||||
|                         bs_fqme: str = cb_sym_to_deribit_inst( | ||||
|                             str_to_cb_sym(book.symbol) | ||||
|                         ).lower() | ||||
| 
 | ||||
|                         piker_quote: dict = { | ||||
|                             'symbol': bs_fqme, | ||||
|                             'ticks': [ | ||||
| 
 | ||||
|                                 {'type': 'bid', | ||||
|                                  'price': float(book.bid_price), | ||||
|                                  'size': float(book.bid_size)}, | ||||
| 
 | ||||
|                                 {'type': 'bsize', | ||||
|                                  'price': float(book.bid_price), | ||||
|                                  'size': float(book.bid_size),}, | ||||
| 
 | ||||
|                                 {'type': 'ask', | ||||
|                                  'price': float(book.ask_price), | ||||
|                                  'size': float(book.ask_size),}, | ||||
| 
 | ||||
|                                 {'type': 'asize', | ||||
|                                  'price': float(book.ask_price), | ||||
|                                  'size': float(book.ask_size),} | ||||
|                             ] | ||||
|                         } | ||||
| 
 | ||||
|                 await send_chan.send({ | ||||
|                     topic: quote, | ||||
|                     topic: piker_quote, | ||||
|                 }) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -327,7 +395,6 @@ async def open_symbol_search( | |||
|         await ctx.started() | ||||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             pattern: str | ||||
|             async for pattern in stream: | ||||
| 
 | ||||
|  |  | |||
|  | @ -154,6 +154,7 @@ class JSONRPCResult(Struct): | |||
|     error: Optional[dict] = None | ||||
|     result: Optional[list[dict]] = None | ||||
| 
 | ||||
| 
 | ||||
| class JSONRPCChannel(Struct): | ||||
|     method: str | ||||
|     params: dict | ||||
|  | @ -170,6 +171,7 @@ class KLinesResult(Struct): | |||
|     status: str | ||||
|     volume: list[float] | ||||
| 
 | ||||
| 
 | ||||
| class Trade(Struct): | ||||
|     iv: float | ||||
|     price: float | ||||
|  | @ -188,6 +190,7 @@ class Trade(Struct): | |||
|     block_trade_id: Optional[str] = '', | ||||
|     block_trade_leg_count: Optional[int] = 0, | ||||
| 
 | ||||
| 
 | ||||
| class LastTradesResult(Struct): | ||||
|     trades: list[Trade] | ||||
|     has_more: bool | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue