Compare commits

..

5 Commits

Author SHA1 Message Date
Tyler Goodlet 1705afb607 `.deribit.feed`: get live quotes workin (again)
The quote-msg `'topic'` field was being set and sent as the
`OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str`
as is required for matching on the `piker.data.feed` side. So change to
that and simplify the actual `.bs_fqme: str` value to NOT include the
ISO-format time (for now) since it's a big ugly and longer term we need
a `piker`-fqme friendly-on-ze-eyes format/style anyway..
2024-11-19 21:14:33 -05:00
Tyler Goodlet dafd5a3ca5 Bit more `cryptofeed` adapter formatting and typing for clarity.. 2024-11-19 21:14:11 -05:00
Tyler Goodlet b9dde98d1e .deribit.venues: add todo for an ideal `OptionPair.expiry` fmt/value 2024-11-19 21:13:21 -05:00
Tyler Goodlet 1616cc0e82 `.data._sampling`: warn about subscriber-less msgs
Since it usually means the data-provider backend is keying the msgs
incorrectly (not using the equivalent `MktPair.bs_fqme` which as
would be rendered from the delivered `FeedInit.mkt` instance..) and
reporting the subs list should make it clear how the fqme matching is
off.

Deats,
- use the new `.log.mk_repr()` for a formatter.
- add a commented info emission that can be unmasked to help debug any
  such cases as mentioned in the summary ^^.
2024-11-19 21:09:09 -05:00
Tyler Goodlet 0a2ed195a7 Add `.log.mk_repr()` to create `reprlib.Repr`s 2024-11-19 21:05:16 -05:00
6 changed files with 173 additions and 74 deletions

View File

@ -123,14 +123,20 @@ def str_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date) expiry_date=new_expiry_date
)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple( (
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-')) name.upper().split('-'))
quote = base quote: str = base
if option_type == 'P': if option_type == 'P':
option_type = PUT option_type = PUT
@ -145,7 +151,8 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date) expiry_date=expiry_date
)
def cb_sym_to_deribit_inst(sym: Symbol): def cb_sym_to_deribit_inst(sym: Symbol):
@ -208,7 +215,10 @@ def get_config() -> dict[str, Any]:
class Client: class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__( def __init__(
self, self,
@ -609,18 +619,33 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', { async def _trade(
data: dict,
receipt_timestamp: int,
) -> None:
'''
Send `cryptofeed.FeedHandler` quotes to `piker`-side
`trio.Task`.
'''
to_trio.send_nowait((
'trade', {
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'last': data, 'last': data,
'broker_ts': time.time(), 'broker_ts': time.time(),
'data': data.to_dict(), 'data': data.to_dict(),
'receipt': receipt_timestamp 'receipt': receipt_timestamp,
})) },
))
async def _l1(data: dict, receipt_timestamp): async def _l1(
to_trio.send_nowait(('l1', { data: dict,
receipt_timestamp: int,
) -> None:
to_trio.send_nowait((
'l1', {
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'ticks': [ 'ticks': [
@ -645,7 +670,8 @@ async def aio_price_feed_relay(
'size': float(data.ask_size) 'size': float(data.ask_size)
} }
] ]
})) },
))
sym: Symbol = piker_sym_to_cb_sym(instrument) sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,

View File

@ -29,6 +29,7 @@ from typing import (
# from pprint import pformat # from pprint import pformat
import time import time
import cryptofeed
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
@ -52,19 +53,10 @@ from piker._cacheables import (
) )
from piker.log import ( from piker.log import (
get_logger, get_logger,
mk_repr,
) )
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
# from cryptofeed import FeedHandler
# from cryptofeed.defines import (
# DERIBIT,
# L1_BOOK,
# TRADES,
# OPTION,
# CALL,
# PUT,
# )
# from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Client,
@ -219,51 +211,64 @@ async def get_mkt_info(
price_tick=pair.price_tick, price_tick=pair.price_tick,
size_tick=pair.size_tick, size_tick=pair.size_tick,
bs_mktid=pair.symbol, bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode, venue=mkt_mode,
broker='deribit', broker='deribit',
_atype=mkt_mode, _atype=mkt_mode,
_fqme_without_src=True, _fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
) )
return mkt, pair return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
'''
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(
mkt_info=mkt,
) )
nsym = piker_sym_to_cb_sym(sym) )
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
last_trades = (
# TODO, uhh use it ?? XD await client.last_trades(
# cache = client._pairs cb_sym_to_deribit_inst(cf_sym),
count=1,
last_trades = (await client.last_trades( )
cb_sym_to_deribit_inst(nsym), count=1)).trades ).trades
if len(last_trades) == 0: if len(last_trades) == 0:
last_trade = None last_trade = None
@ -286,16 +291,25 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp 'broker_ts': last_trade.timestamp
}] }]
} }
task_status.started((init_msgs, first_quote)) task_status.started((
init_msgs,
first_quote,
))
feed_is_live.set() feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled # deliver until cancelled
async for typ, quote in stream: async for typ, quote in stream:
topic: str = quote['symbol'] sym: str = quote['symbol']
log.info( log.info(
f'deribit {typ!r} quote\n\n' f'deribit {typ!r} quote for {sym!r}\n\n'
f'{quote}\n' f'{pfmt(quote)}\n'
) )
await send_chan.send({ await send_chan.send({
topic: quote, topic: quote,

View File

@ -26,8 +26,6 @@ from typing import (
) )
from decimal import Decimal from decimal import Decimal
from msgspec import field
from piker.types import Struct from piker.types import Struct
@ -115,9 +113,13 @@ class OptionPair(Pair, frozen=True):
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair' ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property @property
def expiry(self) -> str: def expiry(self) -> str:
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat() iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date return iso_date
@property @property

View File

@ -30,6 +30,7 @@ import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -54,6 +55,9 @@ from ._util import (
get_console_log, get_console_log,
) )
from ..service import maybe_spawn_daemon from ..service import maybe_spawn_daemon
from piker.log import (
mk_repr,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
@ -575,7 +579,6 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus, # noqa bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
@ -596,11 +599,22 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# print(quotes)
# TODO: ``numba`` this! # XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO: `numba` this!
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -673,6 +687,18 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
if not subs:
all_bs_fqmes: list[str] = list(
bus._subscribers.keys()
)
log.warning(
f'No subscribers for {brokername!r} live-quote ??\n'
f'broker_symbol: {broker_symbol}\n\n'
f'Maybe the backend-sys symbol does not match one of,\n'
f'{pfmt(all_bs_fqmes)}\n'
)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct

View File

@ -540,7 +540,10 @@ async def open_feed_bus(
# subscription since the backend isn't (yet) expected to # subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys # append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) . # which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_fqme, set()) bus._subscribers.setdefault(
bs_fqme,
set(),
)
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(

View File

@ -18,7 +18,11 @@
Log like a forester! Log like a forester!
""" """
import logging import logging
import reprlib
import json import json
from typing import (
Callable,
)
import tractor import tractor
from pygments import ( from pygments import (
@ -84,3 +88,27 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr