Compare commits
5 Commits
28e8628c61
...
1705afb607
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 1705afb607 | |
Tyler Goodlet | dafd5a3ca5 | |
Tyler Goodlet | b9dde98d1e | |
Tyler Goodlet | 1616cc0e82 | |
Tyler Goodlet | 0a2ed195a7 |
|
@ -123,14 +123,20 @@ def str_to_cb_sym(name: str) -> Symbol:
|
|||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=new_expiry_date)
|
||||
expiry_date=new_expiry_date
|
||||
)
|
||||
|
||||
|
||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||
base, expiry_date, strike_price, option_type = tuple(
|
||||
(
|
||||
base,
|
||||
expiry_date,
|
||||
strike_price,
|
||||
option_type,
|
||||
)= tuple(
|
||||
name.upper().split('-'))
|
||||
|
||||
quote = base
|
||||
quote: str = base
|
||||
|
||||
if option_type == 'P':
|
||||
option_type = PUT
|
||||
|
@ -145,7 +151,8 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
|||
type=OPTION,
|
||||
strike_price=strike_price,
|
||||
option_type=option_type,
|
||||
expiry_date=expiry_date)
|
||||
expiry_date=expiry_date
|
||||
)
|
||||
|
||||
|
||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||
|
@ -208,7 +215,10 @@ def get_config() -> dict[str, Any]:
|
|||
|
||||
|
||||
class Client:
|
||||
'''
|
||||
Hi-level interface for the jsron-RPC over websocket API.
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
|
@ -609,43 +619,59 @@ async def aio_price_feed_relay(
|
|||
from_trio: asyncio.Queue,
|
||||
to_trio: trio.abc.SendChannel,
|
||||
) -> None:
|
||||
async def _trade(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'last': data,
|
||||
'broker_ts': time.time(),
|
||||
'data': data.to_dict(),
|
||||
'receipt': receipt_timestamp
|
||||
}))
|
||||
|
||||
async def _l1(data: dict, receipt_timestamp):
|
||||
to_trio.send_nowait(('l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
{
|
||||
'type': 'bid',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'bsize',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'ask',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
},
|
||||
{
|
||||
'type': 'asize',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
}
|
||||
]
|
||||
}))
|
||||
async def _trade(
|
||||
data: dict,
|
||||
receipt_timestamp: int,
|
||||
) -> None:
|
||||
'''
|
||||
Send `cryptofeed.FeedHandler` quotes to `piker`-side
|
||||
`trio.Task`.
|
||||
|
||||
'''
|
||||
to_trio.send_nowait((
|
||||
'trade', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'last': data,
|
||||
'broker_ts': time.time(),
|
||||
'data': data.to_dict(),
|
||||
'receipt': receipt_timestamp,
|
||||
},
|
||||
))
|
||||
|
||||
async def _l1(
|
||||
data: dict,
|
||||
receipt_timestamp: int,
|
||||
) -> None:
|
||||
to_trio.send_nowait((
|
||||
'l1', {
|
||||
'symbol': cb_sym_to_deribit_inst(
|
||||
str_to_cb_sym(data.symbol)).lower(),
|
||||
'ticks': [
|
||||
{
|
||||
'type': 'bid',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'bsize',
|
||||
'price': float(data.bid_price),
|
||||
'size': float(data.bid_size)
|
||||
},
|
||||
{
|
||||
'type': 'ask',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
},
|
||||
{
|
||||
'type': 'asize',
|
||||
'price': float(data.ask_price),
|
||||
'size': float(data.ask_size)
|
||||
}
|
||||
]
|
||||
},
|
||||
))
|
||||
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
||||
fh.add_feed(
|
||||
DERIBIT,
|
||||
|
|
|
@ -29,6 +29,7 @@ from typing import (
|
|||
# from pprint import pformat
|
||||
import time
|
||||
|
||||
import cryptofeed
|
||||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
from pendulum import (
|
||||
|
@ -52,19 +53,10 @@ from piker._cacheables import (
|
|||
)
|
||||
from piker.log import (
|
||||
get_logger,
|
||||
mk_repr,
|
||||
)
|
||||
from piker.data.validate import FeedInit
|
||||
|
||||
# from cryptofeed import FeedHandler
|
||||
# from cryptofeed.defines import (
|
||||
# DERIBIT,
|
||||
# L1_BOOK,
|
||||
# TRADES,
|
||||
# OPTION,
|
||||
# CALL,
|
||||
# PUT,
|
||||
# )
|
||||
# from cryptofeed.symbols import Symbol
|
||||
|
||||
from .api import (
|
||||
Client,
|
||||
|
@ -219,51 +211,64 @@ async def get_mkt_info(
|
|||
price_tick=pair.price_tick,
|
||||
size_tick=pair.size_tick,
|
||||
bs_mktid=pair.symbol,
|
||||
expiry=pair.expiry,
|
||||
venue=mkt_mode,
|
||||
broker='deribit',
|
||||
_atype=mkt_mode,
|
||||
_fqme_without_src=True,
|
||||
|
||||
# expiry=pair.expiry,
|
||||
# XXX TODO, currently we don't use it since it's
|
||||
# already "described" in the `OptionPair.symbol: str`
|
||||
# and if we slap in the ISO repr it's kinda hideous..
|
||||
# -[ ] figure out the best either std
|
||||
)
|
||||
return mkt, pair
|
||||
|
||||
|
||||
async def stream_quotes(
|
||||
|
||||
send_chan: trio.abc.SendChannel,
|
||||
symbols: list[str],
|
||||
feed_is_live: trio.Event,
|
||||
loglevel: str = None,
|
||||
|
||||
# startup sync
|
||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Open a live quote stream for the market set defined by `symbols`.
|
||||
|
||||
'''
|
||||
sym = symbols[0].split('.')[0]
|
||||
|
||||
init_msgs: list[FeedInit] = []
|
||||
|
||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
||||
# just that).
|
||||
pfmt: Callable[[str], str] = mk_repr()
|
||||
|
||||
async with (
|
||||
open_cached_client('deribit') as client,
|
||||
send_chan as send_chan
|
||||
):
|
||||
|
||||
mkt: MktPair
|
||||
pair: Pair
|
||||
mkt, pair = await get_mkt_info(sym)
|
||||
|
||||
# build out init msgs according to latest spec
|
||||
init_msgs.append(
|
||||
FeedInit(mkt_info=mkt)
|
||||
FeedInit(
|
||||
mkt_info=mkt,
|
||||
)
|
||||
)
|
||||
nsym = piker_sym_to_cb_sym(sym)
|
||||
# build `cryptofeed` feed-handle
|
||||
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
|
||||
|
||||
async with maybe_open_price_feed(sym) as stream:
|
||||
|
||||
# TODO, uhh use it ?? XD
|
||||
# cache = client._pairs
|
||||
|
||||
last_trades = (await client.last_trades(
|
||||
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||
last_trades = (
|
||||
await client.last_trades(
|
||||
cb_sym_to_deribit_inst(cf_sym),
|
||||
count=1,
|
||||
)
|
||||
).trades
|
||||
|
||||
if len(last_trades) == 0:
|
||||
last_trade = None
|
||||
|
@ -286,16 +291,25 @@ async def stream_quotes(
|
|||
'broker_ts': last_trade.timestamp
|
||||
}]
|
||||
}
|
||||
task_status.started((init_msgs, first_quote))
|
||||
task_status.started((
|
||||
init_msgs,
|
||||
first_quote,
|
||||
))
|
||||
|
||||
feed_is_live.set()
|
||||
|
||||
# NOTE XXX, static for now!
|
||||
# => since this only handles ONE mkt feed at a time we
|
||||
# don't need a lookup table to map interleaved quotes
|
||||
# from multiple possible mkt-pairs
|
||||
topic: str = mkt.bs_fqme
|
||||
|
||||
# deliver until cancelled
|
||||
async for typ, quote in stream:
|
||||
topic: str = quote['symbol']
|
||||
sym: str = quote['symbol']
|
||||
log.info(
|
||||
f'deribit {typ!r} quote\n\n'
|
||||
f'{quote}\n'
|
||||
f'deribit {typ!r} quote for {sym!r}\n\n'
|
||||
f'{pfmt(quote)}\n'
|
||||
)
|
||||
await send_chan.send({
|
||||
topic: quote,
|
||||
|
|
|
@ -26,8 +26,6 @@ from typing import (
|
|||
)
|
||||
from decimal import Decimal
|
||||
|
||||
from msgspec import field
|
||||
|
||||
from piker.types import Struct
|
||||
|
||||
|
||||
|
@ -115,9 +113,13 @@ class OptionPair(Pair, frozen=True):
|
|||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
||||
|
||||
# TODO, impl this without the MM:SS part of
|
||||
# the `'THH:MM:SS..'` etc..
|
||||
@property
|
||||
def expiry(self) -> str:
|
||||
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
|
||||
iso_date = pendulum.from_timestamp(
|
||||
self.expiration_timestamp / 1000
|
||||
).isoformat()
|
||||
return iso_date
|
||||
|
||||
@property
|
||||
|
|
|
@ -30,6 +30,7 @@ import time
|
|||
from typing import (
|
||||
Any,
|
||||
AsyncIterator,
|
||||
Callable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
|
@ -54,6 +55,9 @@ from ._util import (
|
|||
get_console_log,
|
||||
)
|
||||
from ..service import maybe_spawn_daemon
|
||||
from piker.log import (
|
||||
mk_repr,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._sharedmem import (
|
||||
|
@ -575,7 +579,6 @@ async def open_sample_stream(
|
|||
|
||||
|
||||
async def sample_and_broadcast(
|
||||
|
||||
bus: _FeedsBus, # noqa
|
||||
rt_shm: ShmArray,
|
||||
hist_shm: ShmArray,
|
||||
|
@ -596,11 +599,22 @@ async def sample_and_broadcast(
|
|||
|
||||
overruns = Counter()
|
||||
|
||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
||||
# just that).
|
||||
pfmt: Callable[[str], str] = mk_repr()
|
||||
|
||||
# iterate stream delivered by broker
|
||||
async for quotes in quote_stream:
|
||||
# print(quotes)
|
||||
|
||||
# TODO: ``numba`` this!
|
||||
# XXX WARNING XXX only enable for debugging bc ow can cost
|
||||
# ALOT of perf with HF-feedz!!!
|
||||
#
|
||||
# log.info(
|
||||
# 'Rx live quotes:\n'
|
||||
# f'{pfmt(quotes)}'
|
||||
# )
|
||||
|
||||
# TODO: `numba` this!
|
||||
for broker_symbol, quote in quotes.items():
|
||||
# TODO: in theory you can send the IPC msg *before* writing
|
||||
# to the sharedmem array to decrease latency, however, that
|
||||
|
@ -673,6 +687,18 @@ async def sample_and_broadcast(
|
|||
sub_key: str = broker_symbol.lower()
|
||||
subs: set[Sub] = bus.get_subs(sub_key)
|
||||
|
||||
if not subs:
|
||||
all_bs_fqmes: list[str] = list(
|
||||
bus._subscribers.keys()
|
||||
)
|
||||
log.warning(
|
||||
f'No subscribers for {brokername!r} live-quote ??\n'
|
||||
f'broker_symbol: {broker_symbol}\n\n'
|
||||
|
||||
f'Maybe the backend-sys symbol does not match one of,\n'
|
||||
f'{pfmt(all_bs_fqmes)}\n'
|
||||
)
|
||||
|
||||
# NOTE: by default the broker backend doesn't append
|
||||
# it's own "name" into the fqme schema (but maybe it
|
||||
# should?) so we have to manually generate the correct
|
||||
|
|
|
@ -540,7 +540,10 @@ async def open_feed_bus(
|
|||
# subscription since the backend isn't (yet) expected to
|
||||
# append it's own name to the fqme, so we filter on keys
|
||||
# which *do not* include that name (e.g .ib) .
|
||||
bus._subscribers.setdefault(bs_fqme, set())
|
||||
bus._subscribers.setdefault(
|
||||
bs_fqme,
|
||||
set(),
|
||||
)
|
||||
|
||||
# sync feed subscribers with flume handles
|
||||
await ctx.started(
|
||||
|
|
28
piker/log.py
28
piker/log.py
|
@ -18,7 +18,11 @@
|
|||
Log like a forester!
|
||||
"""
|
||||
import logging
|
||||
import reprlib
|
||||
import json
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import tractor
|
||||
from pygments import (
|
||||
|
@ -84,3 +88,27 @@ def colorize_json(
|
|||
# likeable styles: algol_nu, tango, monokai
|
||||
formatters.TerminalTrueColorFormatter(style=style)
|
||||
)
|
||||
|
||||
|
||||
def mk_repr(
|
||||
**repr_kws,
|
||||
) -> Callable[[str], str]:
|
||||
'''
|
||||
Allocate and deliver a `repr.Repr` instance with provided input
|
||||
settings using the std-lib's `reprlib` mod,
|
||||
* https://docs.python.org/3/library/reprlib.html
|
||||
|
||||
------ Ex. ------
|
||||
An up to 6-layer-nested `dict` as multi-line:
|
||||
- https://stackoverflow.com/a/79102479
|
||||
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
||||
|
||||
'''
|
||||
def_kws: dict[str, int] = dict(
|
||||
indent=2,
|
||||
maxlevel=6, # recursion levels
|
||||
maxstring=66, # match editor line-len limit
|
||||
)
|
||||
def_kws |= repr_kws
|
||||
reprr = reprlib.Repr(**def_kws)
|
||||
return reprr.repr
|
||||
|
|
Loading…
Reference in New Issue