Compare commits
No commits in common. "1705afb6079f48a811290443031a9da26d69fa2e" and "28e8628c61db0ad4ffaf896ed8ff6cddca1b5607" have entirely different histories.
1705afb607
...
28e8628c61
|
@ -123,20 +123,14 @@ def str_to_cb_sym(name: str) -> Symbol:
|
||||||
type=OPTION,
|
type=OPTION,
|
||||||
strike_price=strike_price,
|
strike_price=strike_price,
|
||||||
option_type=option_type,
|
option_type=option_type,
|
||||||
expiry_date=new_expiry_date
|
expiry_date=new_expiry_date)
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||||
(
|
base, expiry_date, strike_price, option_type = tuple(
|
||||||
base,
|
|
||||||
expiry_date,
|
|
||||||
strike_price,
|
|
||||||
option_type,
|
|
||||||
)= tuple(
|
|
||||||
name.upper().split('-'))
|
name.upper().split('-'))
|
||||||
|
|
||||||
quote: str = base
|
quote = base
|
||||||
|
|
||||||
if option_type == 'P':
|
if option_type == 'P':
|
||||||
option_type = PUT
|
option_type = PUT
|
||||||
|
@ -151,8 +145,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
|
||||||
type=OPTION,
|
type=OPTION,
|
||||||
strike_price=strike_price,
|
strike_price=strike_price,
|
||||||
option_type=option_type,
|
option_type=option_type,
|
||||||
expiry_date=expiry_date
|
expiry_date=expiry_date)
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def cb_sym_to_deribit_inst(sym: Symbol):
|
def cb_sym_to_deribit_inst(sym: Symbol):
|
||||||
|
@ -215,10 +208,7 @@ def get_config() -> dict[str, Any]:
|
||||||
|
|
||||||
|
|
||||||
class Client:
|
class Client:
|
||||||
'''
|
|
||||||
Hi-level interface for the jsron-RPC over websocket API.
|
|
||||||
|
|
||||||
'''
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
|
@ -619,59 +609,43 @@ async def aio_price_feed_relay(
|
||||||
from_trio: asyncio.Queue,
|
from_trio: asyncio.Queue,
|
||||||
to_trio: trio.abc.SendChannel,
|
to_trio: trio.abc.SendChannel,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
async def _trade(data: dict, receipt_timestamp):
|
||||||
|
to_trio.send_nowait(('trade', {
|
||||||
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
|
'last': data,
|
||||||
|
'broker_ts': time.time(),
|
||||||
|
'data': data.to_dict(),
|
||||||
|
'receipt': receipt_timestamp
|
||||||
|
}))
|
||||||
|
|
||||||
async def _trade(
|
async def _l1(data: dict, receipt_timestamp):
|
||||||
data: dict,
|
to_trio.send_nowait(('l1', {
|
||||||
receipt_timestamp: int,
|
'symbol': cb_sym_to_deribit_inst(
|
||||||
) -> None:
|
str_to_cb_sym(data.symbol)).lower(),
|
||||||
'''
|
'ticks': [
|
||||||
Send `cryptofeed.FeedHandler` quotes to `piker`-side
|
{
|
||||||
`trio.Task`.
|
'type': 'bid',
|
||||||
|
'price': float(data.bid_price),
|
||||||
'''
|
'size': float(data.bid_size)
|
||||||
to_trio.send_nowait((
|
},
|
||||||
'trade', {
|
{
|
||||||
'symbol': cb_sym_to_deribit_inst(
|
'type': 'bsize',
|
||||||
str_to_cb_sym(data.symbol)).lower(),
|
'price': float(data.bid_price),
|
||||||
'last': data,
|
'size': float(data.bid_size)
|
||||||
'broker_ts': time.time(),
|
},
|
||||||
'data': data.to_dict(),
|
{
|
||||||
'receipt': receipt_timestamp,
|
'type': 'ask',
|
||||||
},
|
'price': float(data.ask_price),
|
||||||
))
|
'size': float(data.ask_size)
|
||||||
|
},
|
||||||
async def _l1(
|
{
|
||||||
data: dict,
|
'type': 'asize',
|
||||||
receipt_timestamp: int,
|
'price': float(data.ask_price),
|
||||||
) -> None:
|
'size': float(data.ask_size)
|
||||||
to_trio.send_nowait((
|
}
|
||||||
'l1', {
|
]
|
||||||
'symbol': cb_sym_to_deribit_inst(
|
}))
|
||||||
str_to_cb_sym(data.symbol)).lower(),
|
|
||||||
'ticks': [
|
|
||||||
{
|
|
||||||
'type': 'bid',
|
|
||||||
'price': float(data.bid_price),
|
|
||||||
'size': float(data.bid_size)
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'type': 'bsize',
|
|
||||||
'price': float(data.bid_price),
|
|
||||||
'size': float(data.bid_size)
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'type': 'ask',
|
|
||||||
'price': float(data.ask_price),
|
|
||||||
'size': float(data.ask_size)
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'type': 'asize',
|
|
||||||
'price': float(data.ask_price),
|
|
||||||
'size': float(data.ask_size)
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
|
||||||
))
|
|
||||||
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
sym: Symbol = piker_sym_to_cb_sym(instrument)
|
||||||
fh.add_feed(
|
fh.add_feed(
|
||||||
DERIBIT,
|
DERIBIT,
|
||||||
|
|
|
@ -29,7 +29,6 @@ from typing import (
|
||||||
# from pprint import pformat
|
# from pprint import pformat
|
||||||
import time
|
import time
|
||||||
|
|
||||||
import cryptofeed
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
from pendulum import (
|
from pendulum import (
|
||||||
|
@ -53,10 +52,19 @@ from piker._cacheables import (
|
||||||
)
|
)
|
||||||
from piker.log import (
|
from piker.log import (
|
||||||
get_logger,
|
get_logger,
|
||||||
mk_repr,
|
|
||||||
)
|
)
|
||||||
from piker.data.validate import FeedInit
|
from piker.data.validate import FeedInit
|
||||||
|
|
||||||
|
# from cryptofeed import FeedHandler
|
||||||
|
# from cryptofeed.defines import (
|
||||||
|
# DERIBIT,
|
||||||
|
# L1_BOOK,
|
||||||
|
# TRADES,
|
||||||
|
# OPTION,
|
||||||
|
# CALL,
|
||||||
|
# PUT,
|
||||||
|
# )
|
||||||
|
# from cryptofeed.symbols import Symbol
|
||||||
|
|
||||||
from .api import (
|
from .api import (
|
||||||
Client,
|
Client,
|
||||||
|
@ -211,64 +219,51 @@ async def get_mkt_info(
|
||||||
price_tick=pair.price_tick,
|
price_tick=pair.price_tick,
|
||||||
size_tick=pair.size_tick,
|
size_tick=pair.size_tick,
|
||||||
bs_mktid=pair.symbol,
|
bs_mktid=pair.symbol,
|
||||||
|
expiry=pair.expiry,
|
||||||
venue=mkt_mode,
|
venue=mkt_mode,
|
||||||
broker='deribit',
|
broker='deribit',
|
||||||
_atype=mkt_mode,
|
_atype=mkt_mode,
|
||||||
_fqme_without_src=True,
|
_fqme_without_src=True,
|
||||||
|
|
||||||
# expiry=pair.expiry,
|
|
||||||
# XXX TODO, currently we don't use it since it's
|
|
||||||
# already "described" in the `OptionPair.symbol: str`
|
|
||||||
# and if we slap in the ISO repr it's kinda hideous..
|
|
||||||
# -[ ] figure out the best either std
|
|
||||||
)
|
)
|
||||||
return mkt, pair
|
return mkt, pair
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
|
|
||||||
send_chan: trio.abc.SendChannel,
|
send_chan: trio.abc.SendChannel,
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
|
loglevel: str = None,
|
||||||
|
|
||||||
# startup sync
|
# startup sync
|
||||||
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
|
||||||
Open a live quote stream for the market set defined by `symbols`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
sym = symbols[0].split('.')[0]
|
sym = symbols[0].split('.')[0]
|
||||||
init_msgs: list[FeedInit] = []
|
|
||||||
|
|
||||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
init_msgs: list[FeedInit] = []
|
||||||
# just that).
|
|
||||||
pfmt: Callable[[str], str] = mk_repr()
|
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
open_cached_client('deribit') as client,
|
open_cached_client('deribit') as client,
|
||||||
send_chan as send_chan
|
send_chan as send_chan
|
||||||
):
|
):
|
||||||
mkt: MktPair
|
|
||||||
pair: Pair
|
|
||||||
mkt, pair = await get_mkt_info(sym)
|
mkt, pair = await get_mkt_info(sym)
|
||||||
|
|
||||||
# build out init msgs according to latest spec
|
# build out init msgs according to latest spec
|
||||||
init_msgs.append(
|
init_msgs.append(
|
||||||
FeedInit(
|
FeedInit(mkt_info=mkt)
|
||||||
mkt_info=mkt,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
# build `cryptofeed` feed-handle
|
nsym = piker_sym_to_cb_sym(sym)
|
||||||
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
|
|
||||||
|
|
||||||
async with maybe_open_price_feed(sym) as stream:
|
async with maybe_open_price_feed(sym) as stream:
|
||||||
last_trades = (
|
|
||||||
await client.last_trades(
|
# TODO, uhh use it ?? XD
|
||||||
cb_sym_to_deribit_inst(cf_sym),
|
# cache = client._pairs
|
||||||
count=1,
|
|
||||||
)
|
last_trades = (await client.last_trades(
|
||||||
).trades
|
cb_sym_to_deribit_inst(nsym), count=1)).trades
|
||||||
|
|
||||||
if len(last_trades) == 0:
|
if len(last_trades) == 0:
|
||||||
last_trade = None
|
last_trade = None
|
||||||
|
@ -291,25 +286,16 @@ async def stream_quotes(
|
||||||
'broker_ts': last_trade.timestamp
|
'broker_ts': last_trade.timestamp
|
||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
task_status.started((
|
task_status.started((init_msgs, first_quote))
|
||||||
init_msgs,
|
|
||||||
first_quote,
|
|
||||||
))
|
|
||||||
|
|
||||||
feed_is_live.set()
|
feed_is_live.set()
|
||||||
|
|
||||||
# NOTE XXX, static for now!
|
|
||||||
# => since this only handles ONE mkt feed at a time we
|
|
||||||
# don't need a lookup table to map interleaved quotes
|
|
||||||
# from multiple possible mkt-pairs
|
|
||||||
topic: str = mkt.bs_fqme
|
|
||||||
|
|
||||||
# deliver until cancelled
|
# deliver until cancelled
|
||||||
async for typ, quote in stream:
|
async for typ, quote in stream:
|
||||||
sym: str = quote['symbol']
|
topic: str = quote['symbol']
|
||||||
log.info(
|
log.info(
|
||||||
f'deribit {typ!r} quote for {sym!r}\n\n'
|
f'deribit {typ!r} quote\n\n'
|
||||||
f'{pfmt(quote)}\n'
|
f'{quote}\n'
|
||||||
)
|
)
|
||||||
await send_chan.send({
|
await send_chan.send({
|
||||||
topic: quote,
|
topic: quote,
|
||||||
|
|
|
@ -26,6 +26,8 @@ from typing import (
|
||||||
)
|
)
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
|
|
||||||
|
from msgspec import field
|
||||||
|
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
|
|
||||||
|
|
||||||
|
@ -113,13 +115,9 @@ class OptionPair(Pair, frozen=True):
|
||||||
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
|
||||||
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
ns_path: str = 'piker.brokers.deribit:OptionPair'
|
||||||
|
|
||||||
# TODO, impl this without the MM:SS part of
|
|
||||||
# the `'THH:MM:SS..'` etc..
|
|
||||||
@property
|
@property
|
||||||
def expiry(self) -> str:
|
def expiry(self) -> str:
|
||||||
iso_date = pendulum.from_timestamp(
|
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
|
||||||
self.expiration_timestamp / 1000
|
|
||||||
).isoformat()
|
|
||||||
return iso_date
|
return iso_date
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
|
|
@ -30,7 +30,6 @@ import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
Callable,
|
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -55,9 +54,6 @@ from ._util import (
|
||||||
get_console_log,
|
get_console_log,
|
||||||
)
|
)
|
||||||
from ..service import maybe_spawn_daemon
|
from ..service import maybe_spawn_daemon
|
||||||
from piker.log import (
|
|
||||||
mk_repr,
|
|
||||||
)
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._sharedmem import (
|
from ._sharedmem import (
|
||||||
|
@ -579,6 +575,7 @@ async def open_sample_stream(
|
||||||
|
|
||||||
|
|
||||||
async def sample_and_broadcast(
|
async def sample_and_broadcast(
|
||||||
|
|
||||||
bus: _FeedsBus, # noqa
|
bus: _FeedsBus, # noqa
|
||||||
rt_shm: ShmArray,
|
rt_shm: ShmArray,
|
||||||
hist_shm: ShmArray,
|
hist_shm: ShmArray,
|
||||||
|
@ -599,22 +596,11 @@ async def sample_and_broadcast(
|
||||||
|
|
||||||
overruns = Counter()
|
overruns = Counter()
|
||||||
|
|
||||||
# multiline nested `dict` formatter (since rn quote-msgs are
|
|
||||||
# just that).
|
|
||||||
pfmt: Callable[[str], str] = mk_repr()
|
|
||||||
|
|
||||||
# iterate stream delivered by broker
|
# iterate stream delivered by broker
|
||||||
async for quotes in quote_stream:
|
async for quotes in quote_stream:
|
||||||
|
# print(quotes)
|
||||||
|
|
||||||
# XXX WARNING XXX only enable for debugging bc ow can cost
|
# TODO: ``numba`` this!
|
||||||
# ALOT of perf with HF-feedz!!!
|
|
||||||
#
|
|
||||||
# log.info(
|
|
||||||
# 'Rx live quotes:\n'
|
|
||||||
# f'{pfmt(quotes)}'
|
|
||||||
# )
|
|
||||||
|
|
||||||
# TODO: `numba` this!
|
|
||||||
for broker_symbol, quote in quotes.items():
|
for broker_symbol, quote in quotes.items():
|
||||||
# TODO: in theory you can send the IPC msg *before* writing
|
# TODO: in theory you can send the IPC msg *before* writing
|
||||||
# to the sharedmem array to decrease latency, however, that
|
# to the sharedmem array to decrease latency, however, that
|
||||||
|
@ -687,18 +673,6 @@ async def sample_and_broadcast(
|
||||||
sub_key: str = broker_symbol.lower()
|
sub_key: str = broker_symbol.lower()
|
||||||
subs: set[Sub] = bus.get_subs(sub_key)
|
subs: set[Sub] = bus.get_subs(sub_key)
|
||||||
|
|
||||||
if not subs:
|
|
||||||
all_bs_fqmes: list[str] = list(
|
|
||||||
bus._subscribers.keys()
|
|
||||||
)
|
|
||||||
log.warning(
|
|
||||||
f'No subscribers for {brokername!r} live-quote ??\n'
|
|
||||||
f'broker_symbol: {broker_symbol}\n\n'
|
|
||||||
|
|
||||||
f'Maybe the backend-sys symbol does not match one of,\n'
|
|
||||||
f'{pfmt(all_bs_fqmes)}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# NOTE: by default the broker backend doesn't append
|
# NOTE: by default the broker backend doesn't append
|
||||||
# it's own "name" into the fqme schema (but maybe it
|
# it's own "name" into the fqme schema (but maybe it
|
||||||
# should?) so we have to manually generate the correct
|
# should?) so we have to manually generate the correct
|
||||||
|
|
|
@ -540,10 +540,7 @@ async def open_feed_bus(
|
||||||
# subscription since the backend isn't (yet) expected to
|
# subscription since the backend isn't (yet) expected to
|
||||||
# append it's own name to the fqme, so we filter on keys
|
# append it's own name to the fqme, so we filter on keys
|
||||||
# which *do not* include that name (e.g .ib) .
|
# which *do not* include that name (e.g .ib) .
|
||||||
bus._subscribers.setdefault(
|
bus._subscribers.setdefault(bs_fqme, set())
|
||||||
bs_fqme,
|
|
||||||
set(),
|
|
||||||
)
|
|
||||||
|
|
||||||
# sync feed subscribers with flume handles
|
# sync feed subscribers with flume handles
|
||||||
await ctx.started(
|
await ctx.started(
|
||||||
|
|
28
piker/log.py
28
piker/log.py
|
@ -18,11 +18,7 @@
|
||||||
Log like a forester!
|
Log like a forester!
|
||||||
"""
|
"""
|
||||||
import logging
|
import logging
|
||||||
import reprlib
|
|
||||||
import json
|
import json
|
||||||
from typing import (
|
|
||||||
Callable,
|
|
||||||
)
|
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from pygments import (
|
from pygments import (
|
||||||
|
@ -88,27 +84,3 @@ def colorize_json(
|
||||||
# likeable styles: algol_nu, tango, monokai
|
# likeable styles: algol_nu, tango, monokai
|
||||||
formatters.TerminalTrueColorFormatter(style=style)
|
formatters.TerminalTrueColorFormatter(style=style)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def mk_repr(
|
|
||||||
**repr_kws,
|
|
||||||
) -> Callable[[str], str]:
|
|
||||||
'''
|
|
||||||
Allocate and deliver a `repr.Repr` instance with provided input
|
|
||||||
settings using the std-lib's `reprlib` mod,
|
|
||||||
* https://docs.python.org/3/library/reprlib.html
|
|
||||||
|
|
||||||
------ Ex. ------
|
|
||||||
An up to 6-layer-nested `dict` as multi-line:
|
|
||||||
- https://stackoverflow.com/a/79102479
|
|
||||||
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
|
|
||||||
|
|
||||||
'''
|
|
||||||
def_kws: dict[str, int] = dict(
|
|
||||||
indent=2,
|
|
||||||
maxlevel=6, # recursion levels
|
|
||||||
maxstring=66, # match editor line-len limit
|
|
||||||
)
|
|
||||||
def_kws |= repr_kws
|
|
||||||
reprr = reprlib.Repr(**def_kws)
|
|
||||||
return reprr.repr
|
|
||||||
|
|
Loading…
Reference in New Issue