`deribit.feed`: fix "trade" event streaming

The main change needed to make `piker.data.feed._FeedsBus` work was
to correctly format the `'trade'` msgs with the (new schema) expected
`'ticks': list[dict]` field which,
- we compute the `piker` quote-msg-`dict` from the (now directly proxied through)
  `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`.
- similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side
  `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()`
  and passed to the `cryptofeed.FeedHandler`) and instead mod the
  callback to simply pass through the `.types.L1Book` ref directly to
  the `piker`/`trio` side task for conversion.

In support of all that,
- mask-to-drop the alt-branch to wait on a first rt event when the
  `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't
  seem like this ever even happens?
- add a buncha typing, comments and doc-strs to the routines in
  `.deribit.api` including notes on where we can choose to mod the
  `.bs_fqme` for our eventually preferred `piker` style format.
- simplify some nested `@acm` enters to the new single `async with
  <tuple>)` style.
- be particularly pedantic about typing
  `tractor.to_asyncio.LinkedTaskChannel`
- bit of pep8 line-spacing fixes in `.venues`.
fix_deribit_hist_queries
Tyler Goodlet 2024-11-22 14:58:30 -05:00
parent d96e9d4f11
commit 5cefe8bcdb
3 changed files with 167 additions and 86 deletions

View File

@ -55,9 +55,10 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
from cryptofeed.types import (
# types for managing the cb callbacks. L1Book,
# from cryptofeed.types import L1Book Trade,
)
from piker.brokers import SymbolNotFound from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
@ -66,9 +67,7 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
# JSONRPCChannel,
KLinesResult, KLinesResult,
# Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -98,9 +97,17 @@ _spawn_kwargs = {
} }
# convert datetime obj timestamp to unixtime in milliseconds def deribit_timestamp(when: datetime) -> int:
def deribit_timestamp(when) -> int: '''
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) Convert conventional epoch timestamp, in secs, to unixtime in
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def str_to_cb_sym(name: str) -> Symbol: def str_to_cb_sym(name: str) -> Symbol:
@ -155,11 +162,28 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
) )
def cb_sym_to_deribit_inst(sym: Symbol): # TODO, instead can't we just lookup the `MktPair` directly
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) # and pass it upward to `stream_quotes()`??
otype = 'C' if sym.option_type == CALL else 'P' def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = (
'C' if sym.option_type == CALL
else 'P'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str: def get_values_from_cb_normalized_date(expiry_date: str) -> str:
@ -598,7 +622,7 @@ async def get_client(
@acm @acm
async def open_feed_handler(): async def open_feed_handler() -> FeedHandler:
fh = FeedHandler(config=get_config()) fh = FeedHandler(config=get_config())
yield fh yield fh
await to_asyncio.run_task(fh.stop_async) await to_asyncio.run_task(fh.stop_async)
@ -619,59 +643,37 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
'''
Relay price feed quotes from the `cryptofeed.FeedHandler` to
the `piker`-side `trio.task` consumers for delivery to consumer
sub-actors for various subsystems.
'''
async def _trade( async def _trade(
data: dict, trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int, receipt_timestamp: int,
) -> None: ) -> None:
''' '''
Send `cryptofeed.FeedHandler` quotes to `piker`-side Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
`trio.Task`.
''' '''
to_trio.send_nowait(( to_trio.send_nowait(('trade', trade))
'trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp,
},
))
async def _l1( async def _l1(
data: dict, book: L1Book,
receipt_timestamp: int, receipt_timestamp: int,
) -> None: ) -> None:
to_trio.send_nowait(( '''
'l1', { Relay-thru "l1 book" updates.
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), '''
'ticks': [
{ to_trio.send_nowait(('l1', book))
'type': 'bid',
'price': float(data.bid_price), # TODO, make this work!
'size': float(data.bid_size) # -[ ] why isn't this working in `tractor.pause_from_sync()`??
}, # breakpoint()
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
},
))
sym: Symbol = piker_sym_to_cb_sym(instrument) sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
@ -685,27 +687,35 @@ async def aio_price_feed_relay(
if not fh.running: if not fh.running:
fh.run( fh.run(
start_loop=False, start_loop=False,
install_signal_handlers=False) install_signal_handlers=False
)
# sync with trio # sync with trio
to_trio.send_nowait(None) to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str instrument: str
) -> trio.abc.ReceiveStream: ) -> to_asyncio.LinkedTaskChannel:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial( partial(
aio_price_feed_relay, aio_price_feed_relay,
fh, fh,
instrument instrument
) )
) as (first, chan): ) as (first, chan)
yield chan ):
yield chan
@acm @acm
@ -714,6 +724,7 @@ async def maybe_open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={

View File

@ -63,6 +63,7 @@ from .api import (
# get_config, # get_config,
piker_sym_to_cb_sym, piker_sym_to_cb_sym,
cb_sym_to_deribit_inst, cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
@ -237,13 +238,19 @@ async def stream_quotes(
''' '''
Open a live quote stream for the market set defined by `symbols`. Open a live quote stream for the market set defined by `symbols`.
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
''' '''
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are # multiline nested `dict` formatter (since rn quote-msgs are
# just that). # just that).
pfmt: Callable[[str], str] = mk_repr() pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
@ -262,25 +269,31 @@ async def stream_quotes(
# build `cryptofeed` feed-handle # build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream: from_cf: tractor.to_asyncio.LinkedTaskChannel
last_trades = ( async with maybe_open_price_feed(sym) as from_cf:
await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
).trades
if len(last_trades) == 0: # load the "last trades" summary
last_trade = None last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
async for typ, quote in stream: cb_sym_to_deribit_inst(cf_sym),
if typ == 'trade': count=1,
last_trade = Trade(**(quote['data'])) )
break last_trades: list[Trade] = last_trades_res.trades
else: # TODO, do we even need this or will the above always
last_trade = Trade(**(last_trades[0])) # work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
first_quote = { # else:
last_trade = Trade(
**(last_trades[0])
)
first_quote: dict = {
'symbol': sym, 'symbol': sym,
'last': last_trade.price, 'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, 'brokerd_ts': last_trade.timestamp,
@ -305,14 +318,69 @@ async def stream_quotes(
topic: str = mkt.bs_fqme topic: str = mkt.bs_fqme
# deliver until cancelled # deliver until cancelled
async for typ, quote in stream: async for typ, ref in from_cf:
sym: str = quote['symbol'] match typ:
log.info( case 'trade':
f'deribit {typ!r} quote for {sym!r}\n\n' trade: cryptofeed.types.Trade = ref
f'{pfmt(quote)}\n'
) # TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({ await send_chan.send({
topic: quote, topic: piker_quote,
}) })
@ -327,7 +395,6 @@ async def open_symbol_search(
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:

View File

@ -154,6 +154,7 @@ class JSONRPCResult(Struct):
error: Optional[dict] = None error: Optional[dict] = None
result: Optional[list[dict]] = None result: Optional[list[dict]] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
method: str method: str
params: dict params: dict
@ -170,6 +171,7 @@ class KLinesResult(Struct):
status: str status: str
volume: list[float] volume: list[float]
class Trade(Struct): class Trade(Struct):
iv: float iv: float
price: float price: float
@ -188,6 +190,7 @@ class Trade(Struct):
block_trade_id: Optional[str] = '', block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0, block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
has_more: bool has_more: bool