From 5cefe8bcdbe07e0048878d73083c05c67469cdf2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Nov 2024 14:58:30 -0500 Subject: [PATCH] `deribit.feed`: fix "trade" event streaming The main change needed to make `piker.data.feed._FeedsBus` work was to correctly format the `'trade'` msgs with the (new schema) expected `'ticks': list[dict]` field which, - we compute the `piker` quote-msg-`dict` from the (now directly proxied through) `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`. - similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()` and passed to the `cryptofeed.FeedHandler`) and instead mod the callback to simply pass through the `.types.L1Book` ref directly to the `piker`/`trio` side task for conversion. In support of all that, - mask-to-drop the alt-branch to wait on a first rt event when the `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't seem like this ever even happens? - add a buncha typing, comments and doc-strs to the routines in `.deribit.api` including notes on where we can choose to mod the `.bs_fqme` for our eventually preferred `piker` style format. - simplify some nested `@acm` enters to the new single `async with )` style. - be particularly pedantic about typing `tractor.to_asyncio.LinkedTaskChannel` - bit of pep8 line-spacing fixes in `.venues`. --- piker/brokers/deribit/api.py | 133 +++++++++++++++++--------------- piker/brokers/deribit/feed.py | 117 ++++++++++++++++++++++------ piker/brokers/deribit/venues.py | 3 + 3 files changed, 167 insertions(+), 86 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 5945e634..f846a5c0 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -55,9 +55,10 @@ from cryptofeed.defines import ( OPTION, CALL, PUT ) from cryptofeed.symbols import Symbol - -# types for managing the cb callbacks. -# from cryptofeed.types import L1Book +from cryptofeed.types import ( + L1Book, + Trade, +) from piker.brokers import SymbolNotFound from .venues import ( _ws_url, @@ -66,9 +67,7 @@ from .venues import ( Pair, OptionPair, JSONRPCResult, - # JSONRPCChannel, KLinesResult, - # Trade, LastTradesResult, ) from piker.accounting import ( @@ -98,9 +97,17 @@ _spawn_kwargs = { } -# convert datetime obj timestamp to unixtime in milliseconds -def deribit_timestamp(when) -> int: - return int((when.timestamp() * 1000) + (when.microsecond / 1000)) +def deribit_timestamp(when: datetime) -> int: + ''' + Convert conventional epoch timestamp, in secs, to unixtime in + milliseconds. + + ''' + return int( + (when.timestamp() * 1000) + + + (when.microsecond / 1000) + ) def str_to_cb_sym(name: str) -> Symbol: @@ -155,11 +162,28 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: ) -def cb_sym_to_deribit_inst(sym: Symbol): - new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) - otype = 'C' if sym.option_type == CALL else 'P' +# TODO, instead can't we just lookup the `MktPair` directly +# and pass it upward to `stream_quotes()`?? +def cb_sym_to_deribit_inst(sym: Symbol) -> str: + ''' + Generate our own internal `str`-repr for a `cryptofeed.Symbol` + uniquely from its fields. - return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' + This is the equiv of generating a `Pair.fmqe` from `cryptofeed` + for now i suppose..? + + ''' + new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) + otype = ( + 'C' if sym.option_type == CALL + else 'P' + ) + return ( + f'{sym.base}-' + f'{new_expiry_date}-' + f'{sym.strike_price}-' + f'{otype}' + ) def get_values_from_cb_normalized_date(expiry_date: str) -> str: @@ -598,7 +622,7 @@ async def get_client( @acm -async def open_feed_handler(): +async def open_feed_handler() -> FeedHandler: fh = FeedHandler(config=get_config()) yield fh await to_asyncio.run_task(fh.stop_async) @@ -619,59 +643,37 @@ async def aio_price_feed_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: + ''' + Relay price feed quotes from the `cryptofeed.FeedHandler` to + the `piker`-side `trio.task` consumers for delivery to consumer + sub-actors for various subsystems. + ''' async def _trade( - data: dict, + trade: Trade, # cryptofeed, NOT ours from `.venues`! receipt_timestamp: int, ) -> None: ''' - Send `cryptofeed.FeedHandler` quotes to `piker`-side - `trio.Task`. + Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. ''' - to_trio.send_nowait(( - 'trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp, - }, - )) + to_trio.send_nowait(('trade', trade)) async def _l1( - data: dict, + book: L1Book, receipt_timestamp: int, ) -> None: - to_trio.send_nowait(( - 'l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - { - 'type': 'bid', - 'price': float(data.bid_price), - 'size': float(data.bid_size) - }, - { - 'type': 'bsize', - 'price': float(data.bid_price), - 'size': float(data.bid_size) - }, - { - 'type': 'ask', - 'price': float(data.ask_price), - 'size': float(data.ask_size) - }, - { - 'type': 'asize', - 'price': float(data.ask_price), - 'size': float(data.ask_size) - } - ] - }, - )) + ''' + Relay-thru "l1 book" updates. + + ''' + + to_trio.send_nowait(('l1', book)) + + # TODO, make this work! + # -[ ] why isn't this working in `tractor.pause_from_sync()`?? + # breakpoint() + sym: Symbol = piker_sym_to_cb_sym(instrument) fh.add_feed( DERIBIT, @@ -685,27 +687,35 @@ async def aio_price_feed_relay( if not fh.running: fh.run( start_loop=False, - install_signal_handlers=False) + install_signal_handlers=False + ) # sync with trio to_trio.send_nowait(None) + # run until cancelled await asyncio.sleep(float('inf')) @acm async def open_price_feed( instrument: str -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( +) -> to_asyncio.LinkedTaskChannel: + + fh: FeedHandler + first: None + chan: to_asyncio.LinkedTaskChannel + async with ( + maybe_open_feed_handler() as fh, + to_asyncio.open_channel_from( partial( aio_price_feed_relay, fh, instrument ) - ) as (first, chan): - yield chan + ) as (first, chan) + ): + yield chan @acm @@ -714,6 +724,7 @@ async def maybe_open_price_feed( ) -> trio.abc.ReceiveStream: # TODO: add a predicate to maybe_open_context + feed: to_asyncio.LinkedTaskChannel async with maybe_open_context( acm_func=open_price_feed, kwargs={ diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 28c1cbed..efd43ea5 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -63,6 +63,7 @@ from .api import ( # get_config, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + str_to_cb_sym, maybe_open_price_feed ) from .venues import ( @@ -237,13 +238,19 @@ async def stream_quotes( ''' Open a live quote stream for the market set defined by `symbols`. + Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side + task and relays through L1 and `Trade` msgs here to our `trio.Task`. + ''' sym = symbols[0].split('.')[0] init_msgs: list[FeedInit] = [] # multiline nested `dict` formatter (since rn quote-msgs are # just that). - pfmt: Callable[[str], str] = mk_repr() + pfmt: Callable[[str], str] = mk_repr( + # so we can see `deribit`'s delightfully mega-long bs fields.. + maxstring=100, + ) async with ( open_cached_client('deribit') as client, @@ -262,25 +269,31 @@ async def stream_quotes( # build `cryptofeed` feed-handle cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) - async with maybe_open_price_feed(sym) as stream: - last_trades = ( - await client.last_trades( - cb_sym_to_deribit_inst(cf_sym), - count=1, - ) - ).trades + from_cf: tractor.to_asyncio.LinkedTaskChannel + async with maybe_open_price_feed(sym) as from_cf: - if len(last_trades) == 0: - last_trade = None - async for typ, quote in stream: - if typ == 'trade': - last_trade = Trade(**(quote['data'])) - break + # load the "last trades" summary + last_trades_res: cryptofeed.LastTradesResult = await client.last_trades( + cb_sym_to_deribit_inst(cf_sym), + count=1, + ) + last_trades: list[Trade] = last_trades_res.trades - else: - last_trade = Trade(**(last_trades[0])) + # TODO, do we even need this or will the above always + # work? + # if not last_trades: + # await tractor.pause() + # async for typ, quote in from_cf: + # if typ == 'trade': + # last_trade = Trade(**(quote['data'])) + # break - first_quote = { + # else: + last_trade = Trade( + **(last_trades[0]) + ) + + first_quote: dict = { 'symbol': sym, 'last': last_trade.price, 'brokerd_ts': last_trade.timestamp, @@ -305,14 +318,69 @@ async def stream_quotes( topic: str = mkt.bs_fqme # deliver until cancelled - async for typ, quote in stream: - sym: str = quote['symbol'] - log.info( - f'deribit {typ!r} quote for {sym!r}\n\n' - f'{pfmt(quote)}\n' - ) + async for typ, ref in from_cf: + match typ: + case 'trade': + trade: cryptofeed.types.Trade = ref + + # TODO, re-impl this according to teh ideal + # fqme for opts that we choose!! + bs_fqme: str = cb_sym_to_deribit_inst( + str_to_cb_sym(trade.symbol) + ).lower() + + piker_quote: dict = { + 'symbol': bs_fqme, + 'last': trade.price, + 'broker_ts': time.time(), + # ^TODO, name this `brokerd/datad_ts` and + # use `time.time_ns()` ?? + 'ticks': [{ + 'type': 'trade', + 'price': float(trade.price), + 'size': float(trade.amount), + 'broker_ts': trade.timestamp, + }], + } + log.info( + f'deribit {typ!r} quote for {sym!r}\n\n' + f'{trade}\n\n' + f'{pfmt(piker_quote)}\n' + ) + + case 'l1': + book: cryptofeed.types.L1Book = ref + + # TODO, so this is where we can possibly change things + # and instead lever the `MktPair.bs_fqme: str` output? + bs_fqme: str = cb_sym_to_deribit_inst( + str_to_cb_sym(book.symbol) + ).lower() + + piker_quote: dict = { + 'symbol': bs_fqme, + 'ticks': [ + + {'type': 'bid', + 'price': float(book.bid_price), + 'size': float(book.bid_size)}, + + {'type': 'bsize', + 'price': float(book.bid_price), + 'size': float(book.bid_size),}, + + {'type': 'ask', + 'price': float(book.ask_price), + 'size': float(book.ask_size),}, + + {'type': 'asize', + 'price': float(book.ask_price), + 'size': float(book.ask_size),} + ] + } + await send_chan.send({ - topic: quote, + topic: piker_quote, }) @@ -327,7 +395,6 @@ async def open_symbol_search( await ctx.started() async with ctx.open_stream() as stream: - pattern: str async for pattern in stream: diff --git a/piker/brokers/deribit/venues.py b/piker/brokers/deribit/venues.py index 0179c5f0..0dda913e 100644 --- a/piker/brokers/deribit/venues.py +++ b/piker/brokers/deribit/venues.py @@ -154,6 +154,7 @@ class JSONRPCResult(Struct): error: Optional[dict] = None result: Optional[list[dict]] = None + class JSONRPCChannel(Struct): method: str params: dict @@ -170,6 +171,7 @@ class KLinesResult(Struct): status: str volume: list[float] + class Trade(Struct): iv: float price: float @@ -188,6 +190,7 @@ class Trade(Struct): block_trade_id: Optional[str] = '', block_trade_leg_count: Optional[int] = 0, + class LastTradesResult(Struct): trades: list[Trade] has_more: bool