Compare commits

...

9 Commits

Author SHA1 Message Date
Nelson Torres d475347e53 Deribit api key changes introduce:
- `get_timestamp_int`: added this is the hack, so we can aboid use the custom deribit date format.

- `get_currencies`: added so we could get all deribit's available currencies.

- Also a couple of format fixes.
2025-02-17 15:59:40 -03:00
Tyler Goodlet a4954393f9 `deribit.feed`: fix "trade" event streaming
The main change needed to make `piker.data.feed._FeedsBus` work was
to correctly format the `'trade'` msgs with the (new schema) expected
`'ticks': list[dict]` field which,
- we compute the `piker` quote-msg-`dict` from the (now directly proxied through)
  `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`.
- similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side
  `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()`
  and passed to the `cryptofeed.FeedHandler`) and instead mod the
  callback to simply pass through the `.types.L1Book` ref directly to
  the `piker`/`trio` side task for conversion.

In support of all that,
- mask-to-drop the alt-branch to wait on a first rt event when the
  `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't
  seem like this ever even happens?
- add a buncha typing, comments and doc-strs to the routines in
  `.deribit.api` including notes on where we can choose to mod the
  `.bs_fqme` for our eventually preferred `piker` style format.
- simplify some nested `@acm` enters to the new single `async with
  <tuple>)` style.
- be particularly pedantic about typing
  `tractor.to_asyncio.LinkedTaskChannel`
- bit of pep8 line-spacing fixes in `.venues`.
2025-02-17 15:59:40 -03:00
Tyler Goodlet 82be0e7935 `.deribit.feed`: get live quotes workin (again)
The quote-msg `'topic'` field was being set and sent as the
`OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str`
as is required for matching on the `piker.data.feed` side. So change to
that and simplify the actual `.bs_fqme: str` value to NOT include the
ISO-format time (for now) since it's a big ugly and longer term we need
a `piker`-fqme friendly-on-ze-eyes format/style anyway..
2025-02-17 15:59:40 -03:00
Tyler Goodlet 62c67163ec Bit more `cryptofeed` adapter formatting and typing for clarity.. 2025-02-17 15:59:40 -03:00
Tyler Goodlet 52e45ccabf .deribit.venues: add todo for an ideal `OptionPair.expiry` fmt/value 2025-02-17 15:59:40 -03:00
Tyler Goodlet 0fcf278f8f Report the closest (via fuzzy match) pairs on unmatched input 2025-02-17 15:59:40 -03:00
Tyler Goodlet 33387c4baa Signal hist start using `OptionPair.creation_timestamp`
Such that the `get_hist()` query func raises `DataUnavailable` with an
explicit message regarding the start of the (option) contract's
lifetime.

Other,
- mask some unused imports (for now?)
- drop a duplicate `tractor.get_console_log()` call which was causing
  duplicate console emits (it's already setup by brokerd init now).
- comment various unused code bits i found.
- add a info log around live quotes so we can see for the moment when
  they actually occur.. XD
2025-02-17 15:59:40 -03:00
Tyler Goodlet 44dcad1745 `.deribit.api` bit of tidying/typing
There were some imports missing or unused as well as a variety of spots
that had grokability issues due to missing type hints.

Other tweaks as part some more thorough manual testing:
- always raise when not `brokers.toml` section since the API can never
  work (no free data without keys).
- inline the `Asset.atype='crypto_currency` field despite it maybe not
  being the best value for `OptionPair` instruments..
- tossed in a now-masked pause block for debugging history queries in
  `Client.bars()`.
- commented out all the live order ctl (internal) endpoints for now
  since they're unused.
2025-02-17 15:59:40 -03:00
Tyler Goodlet 3a65f95636 'Fix `Optional` and use `'linear/reverse'` in `OptionPair.venue`' 2025-02-17 15:59:40 -03:00
3 changed files with 452 additions and 223 deletions

View File

@ -28,6 +28,8 @@ from decimal import (
Decimal,
)
from functools import partial
from pathlib import Path
from pprint import pformat
import time
from typing import (
Any,
@ -37,8 +39,6 @@ from typing import (
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np
from tractor.trionics import (
broadcast_receiver,
@ -52,11 +52,16 @@ from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT,
L1_BOOK, TRADES,
OPTION, CALL, PUT
OPTION, CALL, PUT,
OPEN_INTEREST,
)
from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from cryptofeed.types import (
L1Book,
Trade,
OpenInterest,
)
from piker.brokers import SymbolNotFound
from .venues import (
_ws_url,
MarketType,
@ -64,9 +69,7 @@ from .venues import (
Pair,
OptionPair,
JSONRPCResult,
JSONRPCChannel,
KLinesResult,
Trade,
LastTradesResult,
)
from piker.accounting import (
@ -77,7 +80,7 @@ from piker.accounting import (
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
Struct,
# Struct,
)
from piker.data._web_bs import (
open_jsonrpc_session
@ -96,9 +99,21 @@ _spawn_kwargs = {
}
# convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
def deribit_timestamp(when: datetime) -> int:
'''
Convert conventional epoch timestamp, in secs, to unixtime in
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol:
@ -107,32 +122,40 @@ def str_to_cb_sym(name: str) -> Symbol:
quote = base
if option_type == 'put':
option_type = PUT
elif option_type == 'call':
option_type = PUT
elif option_type == 'call':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
new_expiry_date: int = get_timestamp_int(
get_values_from_cb_normalized_date(expiry_date)
)
return Symbol(
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=new_expiry_date)
expiry_date=new_expiry_date
)
def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple(
(
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-'))
quote = base
new_expiry_date = get_timestamp_int(expiry_date)
quote: str = base
if option_type == 'P':
option_type = PUT
elif option_type == 'C':
if option_type == 'P' or option_type == 'PUT':
option_type = PUT
elif option_type == 'C' or option_type == 'CALL':
option_type = CALL
else:
raise Exception("Couldn\'t parse option type")
@ -143,14 +166,32 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date)
expiry_date=new_expiry_date
)
def cb_sym_to_deribit_inst(sym: Symbol):
# TODO, instead can't we just lookup the `MktPair` directly
# and pass it upward to `stream_quotes()`??
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
otype = (
'C' if sym.option_type == CALL
else 'P'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
@ -179,32 +220,39 @@ def get_config() -> dict[str, Any]:
conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section = conf.get('deribit')
section: dict|None = conf.get('deribit')
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
raise ValueError(
f'No `[deribit]` section found in\n'
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {})
section.clear # clear the dict to reuse it
section['deribit'] = {}
section['deribit']['key_id'] = conf_option.get('api_key')
section['deribit']['key_secret'] = conf_option.get('api_secret')
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
return section
conf_log = conf_option.get('log', {})
return {
'deribit': {
'key_id': conf_option['key_id'],
'key_secret': conf_option['key_secret'],
},
'log': {
'filename': conf_log['filename'],
'level': conf_log['level'],
'disabled': conf_log['disabled'],
}
}
class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__(
self,
@ -223,8 +271,12 @@ class Client:
self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
async def _json_rpc_auth_wrapper(
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will
refresh the access token.
@ -250,9 +302,6 @@ class Client:
return await self.json_rpc(*args, **kwargs)
async def get_balances(
self,
kind: str = 'option'
@ -272,28 +321,44 @@ class Client:
return balances
async def get_assets(
async def get_currencies(
self,
venue: str | None = None,
) -> dict[str, Asset]:
"""Return the set of asset balances for this account
by symbol.
"""
) -> list[dict]:
'''
Return the set of currencies for deribit.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies = resp.result
return resp.result
async def get_assets(
self,
venue: str | None = None,
) -> dict[str, Asset]:
'''
Return the set of asset balances for this account
by (deribit's) symbol.
'''
assets = {}
currencies = await self.get_currencies()
for currency in currencies:
name = currency['currency']
tx_tick = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
name: str = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset(
name=name,
atype=atype,
tx_tick=tx_tick)
atype='crypto_currency',
tx_tick=tx_tick,
)
instruments = await self.symbol_info(currency=name)
for instrument in instruments:
@ -301,9 +366,10 @@ class Client:
assets[pair.symbol] = Asset(
name=pair.symbol,
atype=pair.venue,
tx_tick=pair.size_tick)
tx_tick=pair.size_tick,
)
return assets
return assets
async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {}
@ -358,6 +424,19 @@ class Client:
return cached_pair
if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym]
else:
return self._pairs
@ -381,7 +460,7 @@ class Client:
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
'expired': str(expired).lower()
'expired': expired,
}
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -389,9 +468,9 @@ class Client:
params,
)
# convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind]
pair_type: Pair = PAIRTYPES[kind]
results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {}
for item in results:
symbol=item['instrument_name'].lower()
@ -427,12 +506,15 @@ class Client:
mkt_pairs = await self.symbol_info()
if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
raise SymbolNotFound(
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue]
pair_type: Pair|OptionPair = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -480,12 +562,14 @@ class Client:
if end_dt is None:
end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None:
start_dt = end_dt.start_of(
'minute').subtract(minutes=limit)
'minute'
).subtract(minutes=limit)
start_time = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt)
start_time: int = deribit_timestamp(start_dt)
end_time: int = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper(
@ -499,9 +583,13 @@ class Client:
result = KLinesResult(**resp.result)
new_bars: list[tuple] = []
for i in range(len(result.close)):
# if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
row = [
for i in range(len(result.close)):
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
@ -554,7 +642,7 @@ async def get_client(
@acm
async def open_feed_handler():
async def open_feed_handler() -> FeedHandler:
fh = FeedHandler(config=get_config())
yield fh
await to_asyncio.run_task(fh.stop_async)
@ -575,43 +663,37 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
'broker_ts': time.time(),
'data': data.to_dict(),
'receipt': receipt_timestamp
}))
'''
Relay price feed quotes from the `cryptofeed.FeedHandler` to
the `piker`-side `trio.task` consumers for delivery to consumer
sub-actors for various subsystems.
'''
async def _trade(
trade: Trade, # cryptofeed, NOT ours from `.venues`!
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
async def _l1(
book: L1Book,
receipt_timestamp: int,
) -> None:
'''
Relay-thru "l1 book" updates.
'''
to_trio.send_nowait(('l1', book))
# TODO, make this work!
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
# breakpoint()
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
}))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed(
DERIBIT,
@ -625,27 +707,35 @@ async def aio_price_feed_relay(
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
install_signal_handlers=False
)
# sync with trio
to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf'))
@acm
async def open_price_feed(
instrument: str
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
) -> to_asyncio.LinkedTaskChannel:
fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
) as (first, chan)
):
yield chan
@acm
@ -654,6 +744,7 @@ async def maybe_open_price_feed(
) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
@ -668,68 +759,69 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay(
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()
async def _order_info(data: dict, receipt_timestamp):
breakpoint()
# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()
fh.add_feed(
DERIBIT,
channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()],
callbacks={
FILLS: _fill,
ORDER_INFO: _order_info,
})
# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })
if not fh.running:
fh.run(
start_loop=False,
install_signal_handlers=False)
# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
# # sync with trio
# to_trio.send_nowait(None)
await asyncio.sleep(float('inf'))
# await asyncio.sleep(float('inf'))
@acm
async def open_order_feed(
instrument: list[str]
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay,
fh,
instrument
)
) as (first, chan):
yield chan
# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan
@acm
async def maybe_open_order_feed(
instrument: str
) -> trio.abc.ReceiveStream:
# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context
async with maybe_open_context(
acm_func=open_order_feed,
kwargs={
'instrument': instrument.split('.')[0],
'fh': fh
},
key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
else:
yield feed
# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument.split('.')[0],
# 'fh': fh
# },
# key=f'{instrument.split('.')[0]}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed

View File

@ -18,56 +18,58 @@
Deribit backend.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, Callable
from pprint import pformat
from typing import (
# Any,
# Optional,
Callable,
)
# from pprint import pformat
import time
import cryptofeed
import trio
from trio_typing import TaskStatus
from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.accounting import (
Asset,
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
DataUnavailable,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.log import (
get_logger,
mk_repr,
)
from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import (
Client, Trade,
get_config,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
Client,
# get_config,
piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
Trade,
)
_spawn_kwargs = {
@ -86,6 +88,10 @@ async def open_history_client(
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc(
timeframe: float,
end_dt: datetime | None = None,
@ -105,6 +111,31 @@ async def open_history_client(
end_dt=end_dt,
)
if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
@ -126,14 +157,20 @@ async def open_history_client(
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair] | None:
) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
@ -149,7 +186,7 @@ async def get_mkt_info(
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
venue_lower: str = venue.lower()
# venue_lower: str = venue.lower()
mkt_mode: str = 'option'
@ -175,64 +212,88 @@ async def get_mkt_info(
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
symbols: list[str],
feed_is_live: trio.Event,
loglevel: str = None,
# startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
'''
Open a live quote stream for the market set defined by `symbols`.
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
FeedInit(
mkt_info=mkt,
)
)
nsym = piker_sym_to_cb_sym(sym)
# build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream:
from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
cache = client._pairs
# load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
# TODO, do we even need this or will the above always
# work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
if len(last_trades) == 0:
last_trade = None
async for typ, quote in stream:
if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
# else:
last_trade = Trade(
**(last_trades[0])
)
else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
first_quote: dict = {
'symbol': sym,
'last': last_trade.price,
'brokerd_ts': last_trade.timestamp,
@ -243,13 +304,84 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp
}]
}
task_status.started((init_msgs, first_quote))
task_status.started((
init_msgs,
first_quote,
))
feed_is_live.set()
async for typ, quote in stream:
topic = quote['symbol']
await send_chan.send({topic: quote})
# NOTE XXX, static for now!
# => since this only handles ONE mkt feed at a time we
# don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context
@ -259,13 +391,13 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = client._pairs
# cache = client._pairs
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(

View File

@ -22,11 +22,10 @@ from __future__ import annotations
import pendulum
from typing import (
Literal,
Optional,
)
from decimal import Decimal
from msgspec import field
from piker.types import Struct
@ -111,18 +110,21 @@ class OptionPair(Pair, frozen=True):
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date
@property
def venue(self) -> str:
return 'option'
return f'{self.instrument_type}_option'
@property
def bs_fqme(self) -> str:
@ -152,6 +154,7 @@ class JSONRPCResult(Struct):
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
@ -168,6 +171,7 @@ class KLinesResult(Struct):
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
@ -186,6 +190,7 @@ class Trade(Struct):
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool