Compare commits

..

No commits in common. "1705afb6079f48a811290443031a9da26d69fa2e" and "28e8628c61db0ad4ffaf896ed8ff6cddca1b5607" have entirely different histories.

6 changed files with 74 additions and 173 deletions

View File

@ -123,20 +123,14 @@ def str_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date expiry_date=new_expiry_date)
)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
( base, expiry_date, strike_price, option_type = tuple(
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-')) name.upper().split('-'))
quote: str = base quote = base
if option_type == 'P': if option_type == 'P':
option_type = PUT option_type = PUT
@ -151,8 +145,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date expiry_date=expiry_date)
)
def cb_sym_to_deribit_inst(sym: Symbol): def cb_sym_to_deribit_inst(sym: Symbol):
@ -215,10 +208,7 @@ def get_config() -> dict[str, Any]:
class Client: class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__( def __init__(
self, self,
@ -619,59 +609,43 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
async def _trade( async def _l1(data: dict, receipt_timestamp):
data: dict, to_trio.send_nowait(('l1', {
receipt_timestamp: int, 'symbol': cb_sym_to_deribit_inst(
) -> None: str_to_cb_sym(data.symbol)).lower(),
''' 'ticks': [
Send `cryptofeed.FeedHandler` quotes to `piker`-side {
`trio.Task`. 'type': 'bid',
'price': float(data.bid_price),
''' 'size': float(data.bid_size)
to_trio.send_nowait(( },
'trade', { {
'symbol': cb_sym_to_deribit_inst( 'type': 'bsize',
str_to_cb_sym(data.symbol)).lower(), 'price': float(data.bid_price),
'last': data, 'size': float(data.bid_size)
'broker_ts': time.time(), },
'data': data.to_dict(), {
'receipt': receipt_timestamp, 'type': 'ask',
}, 'price': float(data.ask_price),
)) 'size': float(data.ask_size)
},
async def _l1( {
data: dict, 'type': 'asize',
receipt_timestamp: int, 'price': float(data.ask_price),
) -> None: 'size': float(data.ask_size)
to_trio.send_nowait(( }
'l1', { ]
'symbol': cb_sym_to_deribit_inst( }))
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
},
))
sym: Symbol = piker_sym_to_cb_sym(instrument) sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,

View File

@ -29,7 +29,6 @@ from typing import (
# from pprint import pformat # from pprint import pformat
import time import time
import cryptofeed
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
@ -53,10 +52,19 @@ from piker._cacheables import (
) )
from piker.log import ( from piker.log import (
get_logger, get_logger,
mk_repr,
) )
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
# from cryptofeed import FeedHandler
# from cryptofeed.defines import (
# DERIBIT,
# L1_BOOK,
# TRADES,
# OPTION,
# CALL,
# PUT,
# )
# from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Client,
@ -211,64 +219,51 @@ async def get_mkt_info(
price_tick=pair.price_tick, price_tick=pair.price_tick,
size_tick=pair.size_tick, size_tick=pair.size_tick,
bs_mktid=pair.symbol, bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode, venue=mkt_mode,
broker='deribit', broker='deribit',
_atype=mkt_mode, _atype=mkt_mode,
_fqme_without_src=True, _fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
) )
return mkt, pair return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
'''
Open a live quote stream for the market set defined by `symbols`.
'''
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are init_msgs: list[FeedInit] = []
# just that).
pfmt: Callable[[str], str] = mk_repr()
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
init_msgs.append( init_msgs.append(
FeedInit( FeedInit(mkt_info=mkt)
mkt_info=mkt,
)
) )
# build `cryptofeed` feed-handle nsym = piker_sym_to_cb_sym(sym)
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
last_trades = (
await client.last_trades( # TODO, uhh use it ?? XD
cb_sym_to_deribit_inst(cf_sym), # cache = client._pairs
count=1,
) last_trades = (await client.last_trades(
).trades cb_sym_to_deribit_inst(nsym), count=1)).trades
if len(last_trades) == 0: if len(last_trades) == 0:
last_trade = None last_trade = None
@ -291,25 +286,16 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp 'broker_ts': last_trade.timestamp
}] }]
} }
task_status.started(( task_status.started((init_msgs, first_quote))
init_msgs,
first_quote,
))
feed_is_live.set() feed_is_live.set()
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled # deliver until cancelled
async for typ, quote in stream: async for typ, quote in stream:
sym: str = quote['symbol'] topic: str = quote['symbol']
log.info( log.info(
f'deribit {typ!r} quote for {sym!r}\n\n' f'deribit {typ!r} quote\n\n'
f'{pfmt(quote)}\n' f'{quote}\n'
) )
await send_chan.send({ await send_chan.send({
topic: quote, topic: quote,

View File

@ -26,6 +26,8 @@ from typing import (
) )
from decimal import Decimal from decimal import Decimal
from msgspec import field
from piker.types import Struct from piker.types import Struct
@ -113,13 +115,9 @@ class OptionPair(Pair, frozen=True):
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair' ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property @property
def expiry(self) -> str: def expiry(self) -> str:
iso_date = pendulum.from_timestamp( iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
self.expiration_timestamp / 1000
).isoformat()
return iso_date return iso_date
@property @property

View File

@ -30,7 +30,6 @@ import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -55,9 +54,6 @@ from ._util import (
get_console_log, get_console_log,
) )
from ..service import maybe_spawn_daemon from ..service import maybe_spawn_daemon
from piker.log import (
mk_repr,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
@ -579,6 +575,7 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus, # noqa bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
@ -599,22 +596,11 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# print(quotes)
# XXX WARNING XXX only enable for debugging bc ow can cost # TODO: ``numba`` this!
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO: `numba` this!
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -687,18 +673,6 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
if not subs:
all_bs_fqmes: list[str] = list(
bus._subscribers.keys()
)
log.warning(
f'No subscribers for {brokername!r} live-quote ??\n'
f'broker_symbol: {broker_symbol}\n\n'
f'Maybe the backend-sys symbol does not match one of,\n'
f'{pfmt(all_bs_fqmes)}\n'
)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct

View File

@ -540,10 +540,7 @@ async def open_feed_bus(
# subscription since the backend isn't (yet) expected to # subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys # append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) . # which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault( bus._subscribers.setdefault(bs_fqme, set())
bs_fqme,
set(),
)
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(

View File

@ -18,11 +18,7 @@
Log like a forester! Log like a forester!
""" """
import logging import logging
import reprlib
import json import json
from typing import (
Callable,
)
import tractor import tractor
from pygments import ( from pygments import (
@ -88,27 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr