`.deribit.feed`: get live quotes workin (again)

The quote-msg `'topic'` field was being set and sent as the
`OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str`
as is required for matching on the `piker.data.feed` side. So change to
that and simplify the actual `.bs_fqme: str` value to NOT include the
ISO-format time (for now) since it's a big ugly and longer term we need
a `piker`-fqme friendly-on-ze-eyes format/style anyway..
fix_deribit_hist_queries
Tyler Goodlet 2024-11-19 21:14:33 -05:00
parent dafd5a3ca5
commit 1705afb607
1 changed files with 41 additions and 27 deletions

View File

@ -29,6 +29,7 @@ from typing import (
# from pprint import pformat
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
@ -52,19 +53,10 @@ from piker._cacheables import (
)
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
# from cryptofeed import FeedHandler
# from cryptofeed.defines import (
# DERIBIT,
# L1_BOOK,
# TRADES,
# OPTION,
# CALL,
# PUT,
# )
# from cryptofeed.symbols import Symbol
from .api import (
Client,
@ -219,51 +211,64 @@ async def get_mkt_info(
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
FeedInit(
mkt_info=mkt,
)
nsym = piker_sym_to_cb_sym(sym)
)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream:
# TODO, uhh use it ?? XD
# cache = client._pairs
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
last_trades = (
await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
).trades
if len(last_trades) == 0:
last_trade = None
@ -286,16 +291,25 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((init_msgs, first_quote))
task_status.started((
init_msgs,
first_quote,
))
feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, quote in stream:
topic: str = quote['symbol']
sym: str = quote['symbol']
log.info(
f'deribit {typ!r} quote\n\n'
f'{quote}\n'
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{pfmt(quote)}\n'
)
await send_chan.send({
topic: quote,