Compare commits

..

No commits in common. "1705afb6079f48a811290443031a9da26d69fa2e" and "28e8628c61db0ad4ffaf896ed8ff6cddca1b5607" have entirely different histories.

6 changed files with 74 additions and 173 deletions

View File

@ -123,20 +123,14 @@ def str_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=new_expiry_date
)
expiry_date=new_expiry_date)
def piker_sym_to_cb_sym(name: str) -> Symbol:
(
base,
expiry_date,
strike_price,
option_type,
)= tuple(
base, expiry_date, strike_price, option_type = tuple(
name.upper().split('-'))
quote: str = base
quote = base
if option_type == 'P':
option_type = PUT
@ -151,8 +145,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date
)
expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol):
@ -215,10 +208,7 @@ def get_config() -> dict[str, Any]:
class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__(
self,
@ -619,33 +609,18 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(
data: dict,
receipt_timestamp: int,
) -> None:
'''
Send `cryptofeed.FeedHandler` quotes to `piker`-side
`trio.Task`.
'''
to_trio.send_nowait((
'trade', {
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp,
},
))
'receipt': receipt_timestamp
}))
async def _l1(
data: dict,
receipt_timestamp: int,
) -> None:
to_trio.send_nowait((
'l1', {
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
@ -670,8 +645,7 @@ async def aio_price_feed_relay(
'size': float(data.ask_size)
}
]
},
))
}))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed(
DERIBIT,

View File

@ -29,7 +29,6 @@ from typing import (
# from pprint import pformat
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
@ -53,10 +52,19 @@ from piker._cacheables import (
)
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
# from cryptofeed import FeedHandler
# from cryptofeed.defines import (
# DERIBIT,
# L1_BOOK,
# TRADES,
# OPTION,
# CALL,
# PUT,
# )
# from cryptofeed.symbols import Symbol
from .api import (
Client,
@ -211,64 +219,51 @@ async def get_mkt_info(
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
init_msgs: list[FeedInit] = []
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(
mkt_info=mkt,
FeedInit(mkt_info=mkt)
)
)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
nsym = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream:
last_trades = (
await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
).trades
# TODO, uhh use it ?? XD
# cache = client._pairs
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
if len(last_trades) == 0:
last_trade = None
@ -291,25 +286,16 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((
init_msgs,
first_quote,
))
task_status.started((init_msgs, first_quote))
feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, quote in stream:
sym: str = quote['symbol']
topic: str = quote['symbol']
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{pfmt(quote)}\n'
f'deribit {typ!r} quote\n\n'
f'{quote}\n'
)
await send_chan.send({
topic: quote,

View File

@ -26,6 +26,8 @@ from typing import (
)
from decimal import Decimal
from msgspec import field
from piker.types import Struct
@ -113,13 +115,9 @@ class OptionPair(Pair, frozen=True):
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
return iso_date
@property

View File

@ -30,7 +30,6 @@ import time
from typing import (
Any,
AsyncIterator,
Callable,
TYPE_CHECKING,
)
@ -55,9 +54,6 @@ from ._util import (
get_console_log,
)
from ..service import maybe_spawn_daemon
from piker.log import (
mk_repr,
)
if TYPE_CHECKING:
from ._sharedmem import (
@ -579,6 +575,7 @@ async def open_sample_stream(
async def sample_and_broadcast(
bus: _FeedsBus, # noqa
rt_shm: ShmArray,
hist_shm: ShmArray,
@ -599,22 +596,11 @@ async def sample_and_broadcast(
overruns = Counter()
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker
async for quotes in quote_stream:
# print(quotes)
# XXX WARNING XXX only enable for debugging bc ow can cost
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO: `numba` this!
# TODO: ``numba`` this!
for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
@ -687,18 +673,6 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key)
if not subs:
all_bs_fqmes: list[str] = list(
bus._subscribers.keys()
)
log.warning(
f'No subscribers for {brokername!r} live-quote ??\n'
f'broker_symbol: {broker_symbol}\n\n'
f'Maybe the backend-sys symbol does not match one of,\n'
f'{pfmt(all_bs_fqmes)}\n'
)
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct

View File

@ -540,10 +540,7 @@ async def open_feed_bus(
# subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(
bs_fqme,
set(),
)
bus._subscribers.setdefault(bs_fqme, set())
# sync feed subscribers with flume handles
await ctx.started(

View File

@ -18,11 +18,7 @@
Log like a forester!
"""
import logging
import reprlib
import json
from typing import (
Callable,
)
import tractor
from pygments import (
@ -88,27 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style)
)
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr