Compare commits

...

10 Commits

Author SHA1 Message Date
Nelson Torres ab9a6f9236 Deribit api key changes introduce:
- `get_timestamp_int`: added this is the hack, so we can aboid use the custom deribit date format.

- `get_currencies`: added so we could get all deribit's available currencies.

- Also a couple of format fixes.
2025-02-17 14:25:06 -03:00
Tyler Goodlet 5a2b643112 `deribit.feed`: fix "trade" event streaming
The main change needed to make `piker.data.feed._FeedsBus` work was
to correctly format the `'trade'` msgs with the (new schema) expected
`'ticks': list[dict]` field which,
- we compute the `piker` quote-msg-`dict` from the (now directly proxied through)
  `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`.
- similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side
  `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()`
  and passed to the `cryptofeed.FeedHandler`) and instead mod the
  callback to simply pass through the `.types.L1Book` ref directly to
  the `piker`/`trio` side task for conversion.

In support of all that,
- mask-to-drop the alt-branch to wait on a first rt event when the
  `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't
  seem like this ever even happens?
- add a buncha typing, comments and doc-strs to the routines in
  `.deribit.api` including notes on where we can choose to mod the
  `.bs_fqme` for our eventually preferred `piker` style format.
- simplify some nested `@acm` enters to the new single `async with
  <tuple>)` style.
- be particularly pedantic about typing
  `tractor.to_asyncio.LinkedTaskChannel`
- bit of pep8 line-spacing fixes in `.venues`.
2025-02-17 14:25:06 -03:00
Tyler Goodlet 82ad5cf0d9 `.deribit.feed`: get live quotes workin (again)
The quote-msg `'topic'` field was being set and sent as the
`OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str`
as is required for matching on the `piker.data.feed` side. So change to
that and simplify the actual `.bs_fqme: str` value to NOT include the
ISO-format time (for now) since it's a big ugly and longer term we need
a `piker`-fqme friendly-on-ze-eyes format/style anyway..
2025-02-17 14:25:06 -03:00
Tyler Goodlet 4076288ab1 Bit more `cryptofeed` adapter formatting and typing for clarity.. 2025-02-17 14:25:06 -03:00
Tyler Goodlet 6af929abad .deribit.venues: add todo for an ideal `OptionPair.expiry` fmt/value 2025-02-17 14:25:06 -03:00
Tyler Goodlet 8999d6d73a Report the closest (via fuzzy match) pairs on unmatched input 2025-02-17 14:25:06 -03:00
Tyler Goodlet 9703d99ac3 Signal hist start using `OptionPair.creation_timestamp`
Such that the `get_hist()` query func raises `DataUnavailable` with an
explicit message regarding the start of the (option) contract's
lifetime.

Other,
- mask some unused imports (for now?)
- drop a duplicate `tractor.get_console_log()` call which was causing
  duplicate console emits (it's already setup by brokerd init now).
- comment various unused code bits i found.
- add a info log around live quotes so we can see for the moment when
  they actually occur.. XD
2025-02-17 14:25:06 -03:00
Tyler Goodlet 9cb561706b `.deribit.api` bit of tidying/typing
There were some imports missing or unused as well as a variety of spots
that had grokability issues due to missing type hints.

Other tweaks as part some more thorough manual testing:
- always raise when not `brokers.toml` section since the API can never
  work (no free data without keys).
- inline the `Asset.atype='crypto_currency` field despite it maybe not
  being the best value for `OptionPair` instruments..
- tossed in a now-masked pause block for debugging history queries in
  `Client.bars()`.
- commented out all the live order ctl (internal) endpoints for now
  since they're unused.
2025-02-17 14:25:06 -03:00
Tyler Goodlet ce9490844f 'Fix `Optional` and use `'linear/reverse'` in `OptionPair.venue`' 2025-02-17 14:25:06 -03:00
Tyler Goodlet 304c6c30e6 Only use `frame_types` if delivered during enter
The `open_history_client()` provider endpoint can *optionally*
deliver a `frame_types: dict[int, pendulum.Duration]` subsection in its
`config: dict[str, dict]` (as was implemented with the `ib` backend).
This allows the `tsp` backfilling machinery to use this "recommended
frame duration" to subtract from the `last_start_dt` any time a `NoData`
gap is signalled by the `get_hist()` call allowing gaps to be ignored
safely without missing history by knowing the next earliest dt we can
query from using the `end_dt`. However, currently all crypto$ providers
haven't implemented this feat yet..

As such only try to use the `frame_types` feature if provided when
handling `NoData` conditions inside `tsp.start_backfill()` and otherwise
raise as normal.
2025-02-17 14:25:06 -03:00
4 changed files with 470 additions and 230 deletions

View File

@ -28,6 +28,8 @@ from decimal import (
Decimal, Decimal,
) )
from functools import partial from functools import partial
from pathlib import Path
from pprint import pformat
import time import time
from typing import ( from typing import (
Any, Any,
@ -37,8 +39,6 @@ from typing import (
from pendulum import now from pendulum import now
import trio import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
@ -52,11 +52,16 @@ from cryptofeed import FeedHandler
from cryptofeed.defines import ( from cryptofeed.defines import (
DERIBIT, DERIBIT,
L1_BOOK, TRADES, L1_BOOK, TRADES,
OPTION, CALL, PUT OPTION, CALL, PUT,
OPEN_INTEREST,
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks. from cryptofeed.types import (
# from cryptofeed.types import L1Book L1Book,
Trade,
OpenInterest,
)
from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
MarketType, MarketType,
@ -64,9 +69,7 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
JSONRPCChannel,
KLinesResult, KLinesResult,
Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -77,7 +80,7 @@ from piker.accounting import (
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
Struct, # Struct,
) )
from piker.data._web_bs import ( from piker.data._web_bs import (
open_jsonrpc_session open_jsonrpc_session
@ -96,9 +99,21 @@ _spawn_kwargs = {
} }
# convert datetime obj timestamp to unixtime in milliseconds def deribit_timestamp(when: datetime) -> int:
def deribit_timestamp(when): '''
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) Convert conventional epoch timestamp, in secs, to unixtime in
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol: def str_to_cb_sym(name: str) -> Symbol:
@ -107,32 +122,40 @@ def str_to_cb_sym(name: str) -> Symbol:
quote = base quote = base
if option_type == 'put': if option_type == 'put':
option_type = PUT option_type = PUT
elif option_type == 'call': elif option_type == 'call':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date) new_expiry_date: int = get_timestamp_int(
get_values_from_cb_normalized_date(expiry_date)
)
return Symbol( return Symbol(
base=base, base=base,
quote=quote, quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date) expiry_date=new_expiry_date
)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
base, expiry_date, strike_price, option_type = tuple( (
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-')) name.upper().split('-'))
quote = base new_expiry_date = get_timestamp_int(expiry_date)
quote: str = base
if option_type == 'P': if option_type == 'P' or option_type == 'PUT':
option_type = PUT option_type = PUT
elif option_type == 'C': elif option_type == 'C' or option_type == 'CALL':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
@ -143,14 +166,32 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date) expiry_date=new_expiry_date
)
def cb_sym_to_deribit_inst(sym: Symbol): # TODO, instead can't we just lookup the `MktPair` directly
# and pass it upward to `stream_quotes()`??
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = 'C' if sym.option_type == CALL else 'P' otype = (
'C' if sym.option_type == CALL
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' else 'P'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str: def get_values_from_cb_normalized_date(expiry_date: str) -> str:
@ -179,32 +220,39 @@ def get_config() -> dict[str, Any]:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict = {} section: dict|None = conf.get('deribit')
section = conf.get('deribit')
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') raise ValueError(
return {} f'No `[deribit]` section found in\n'
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {}) conf_option = section.get('option', {})
section.clear # clear the dict to reuse it conf_log = conf_option.get('log', {})
section['deribit'] = {} return {
section['deribit']['key_id'] = conf_option.get('api_key') 'deribit': {
section['deribit']['key_secret'] = conf_option.get('api_secret') 'key_id': conf_option['key_id'],
'key_secret': conf_option['key_secret'],
section['log'] = {} },
section['log']['filename'] = 'feedhandler.log' 'log': {
section['log']['level'] = 'DEBUG' 'filename': conf_log['filename'],
'level': conf_log['level'],
return section 'disabled': conf_log['disabled'],
}
}
class Client: class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__( def __init__(
self, self,
@ -223,8 +271,12 @@ class Client:
self._auth_ts = None self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: async def _json_rpc_auth_wrapper(
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will """Background task that adquires a first access token and then will
refresh the access token. refresh the access token.
@ -250,9 +302,6 @@ class Client:
return await self.json_rpc(*args, **kwargs) return await self.json_rpc(*args, **kwargs)
async def get_balances( async def get_balances(
self, self,
kind: str = 'option' kind: str = 'option'
@ -272,28 +321,44 @@ class Client:
return balances return balances
async def get_assets( async def get_currencies(
self, self,
venue: str | None = None,
) -> dict[str, Asset]: ) -> list[dict]:
"""Return the set of asset balances for this account '''
by symbol. Return the set of currencies for deribit.
""" '''
assets = {} assets = {}
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
'public/get_currencies', 'public/get_currencies',
params={} params={}
) )
currencies = resp.result return resp.result
async def get_assets(
self,
venue: str | None = None,
) -> dict[str, Asset]:
'''
Return the set of asset balances for this account
by (deribit's) symbol.
'''
assets = {}
currencies = await self.get_currencies()
for currency in currencies: for currency in currencies:
name = currency['currency'] name: str = currency['currency']
tx_tick = digits_to_dec(currency['fee_precision']) tx_tick: Decimal = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset( assets[name] = Asset(
name=name, name=name,
atype=atype, atype='crypto_currency',
tx_tick=tx_tick) tx_tick=tx_tick,
)
instruments = await self.symbol_info(currency=name) instruments = await self.symbol_info(currency=name)
for instrument in instruments: for instrument in instruments:
@ -301,9 +366,10 @@ class Client:
assets[pair.symbol] = Asset( assets[pair.symbol] = Asset(
name=pair.symbol, name=pair.symbol,
atype=pair.venue, atype=pair.venue,
tx_tick=pair.size_tick) tx_tick=pair.size_tick,
)
return assets return assets
async def get_mkt_pairs(self) -> dict[str, Pair]: async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {} flat: dict[str, Pair] = {}
@ -358,6 +424,19 @@ class Client:
return cached_pair return cached_pair
if sym: if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym] return pair_table[sym]
else: else:
return self._pairs return self._pairs
@ -381,7 +460,7 @@ class Client:
params: dict[str, str] = { params: dict[str, str] = {
'currency': currency.upper(), 'currency': currency.upper(),
'kind': kind, 'kind': kind,
'expired': str(expired).lower() 'expired': expired,
} }
resp: JSONRPCResult = await self._json_rpc_auth_wrapper( resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -389,9 +468,9 @@ class Client:
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind] pair_type: Pair = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {} instruments: dict[str, Pair] = {}
for item in results: for item in results:
symbol=item['instrument_name'].lower() symbol=item['instrument_name'].lower()
@ -427,12 +506,15 @@ class Client:
mkt_pairs = await self.symbol_info() mkt_pairs = await self.symbol_info()
if not mkt_pairs: if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}') raise SymbolNotFound(
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {} pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs: for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue] pair_type: Pair|OptionPair = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -480,12 +562,14 @@ class Client:
if end_dt is None: if end_dt is None:
end_dt = now('UTC') end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
'minute').subtract(minutes=limit) 'minute'
).subtract(minutes=limit)
start_time = deribit_timestamp(start_dt) start_time: int = deribit_timestamp(start_dt)
end_time = deribit_timestamp(end_dt) end_time: int = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
@ -499,9 +583,13 @@ class Client:
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars: list[tuple] = [] new_bars: list[tuple] = []
for i in range(len(result.close)): # if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
row = [ for i in range(len(result.close)):
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
@ -554,7 +642,7 @@ async def get_client(
@acm @acm
async def open_feed_handler(): async def open_feed_handler() -> FeedHandler:
fh = FeedHandler(config=get_config()) fh = FeedHandler(config=get_config())
yield fh yield fh
await to_asyncio.run_task(fh.stop_async) await to_asyncio.run_task(fh.stop_async)
@ -575,43 +663,37 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _trade(data: dict, receipt_timestamp): '''
to_trio.send_nowait(('trade', { Relay price feed quotes from the `cryptofeed.FeedHandler` to
'symbol': cb_sym_to_deribit_inst( the `piker`-side `trio.task` consumers for delivery to consumer
str_to_cb_sym(data.symbol)).lower(), sub-actors for various subsystems.
'last': data,
'broker_ts': time.time(), '''
'data': data.to_dict(), async def _trade(
'receipt': receipt_timestamp trade: Trade, # cryptofeed, NOT ours from `.venues`!
})) receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
async def _l1(
book: L1Book,
receipt_timestamp: int,
) -> None:
'''
Relay-thru "l1 book" updates.
'''
to_trio.send_nowait(('l1', book))
# TODO, make this work!
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
# breakpoint()
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
}))
sym: Symbol = piker_sym_to_cb_sym(instrument) sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
@ -625,27 +707,35 @@ async def aio_price_feed_relay(
if not fh.running: if not fh.running:
fh.run( fh.run(
start_loop=False, start_loop=False,
install_signal_handlers=False) install_signal_handlers=False
)
# sync with trio # sync with trio
to_trio.send_nowait(None) to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str instrument: str
) -> trio.abc.ReceiveStream: ) -> to_asyncio.LinkedTaskChannel:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( fh: FeedHandler
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial( partial(
aio_price_feed_relay, aio_price_feed_relay,
fh, fh,
instrument instrument
) )
) as (first, chan): ) as (first, chan)
yield chan ):
yield chan
@acm @acm
@ -654,6 +744,7 @@ async def maybe_open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={
@ -668,68 +759,69 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay( # TODO, move all to `.broker` submod!
fh: FeedHandler, # async def aio_order_feed_relay(
instrument: Symbol, # fh: FeedHandler,
from_trio: asyncio.Queue, # instrument: Symbol,
to_trio: trio.abc.SendChannel, # from_trio: asyncio.Queue,
) -> None: # to_trio: trio.abc.SendChannel,
async def _fill(data: dict, receipt_timestamp): # ) -> None:
breakpoint() # async def _fill(data: dict, receipt_timestamp):
# breakpoint()
async def _order_info(data: dict, receipt_timestamp): # async def _order_info(data: dict, receipt_timestamp):
breakpoint() # breakpoint()
fh.add_feed( # fh.add_feed(
DERIBIT, # DERIBIT,
channels=[FILLS, ORDER_INFO], # channels=[FILLS, ORDER_INFO],
symbols=[instrument.upper()], # symbols=[instrument.upper()],
callbacks={ # callbacks={
FILLS: _fill, # FILLS: _fill,
ORDER_INFO: _order_info, # ORDER_INFO: _order_info,
}) # })
if not fh.running: # if not fh.running:
fh.run( # fh.run(
start_loop=False, # start_loop=False,
install_signal_handlers=False) # install_signal_handlers=False)
# sync with trio # # sync with trio
to_trio.send_nowait(None) # to_trio.send_nowait(None)
await asyncio.sleep(float('inf')) # await asyncio.sleep(float('inf'))
@acm # @acm
async def open_order_feed( # async def open_order_feed(
instrument: list[str] # instrument: list[str]
) -> trio.abc.ReceiveStream: # ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh: # async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from( # async with to_asyncio.open_channel_from(
partial( # partial(
aio_order_feed_relay, # aio_order_feed_relay,
fh, # fh,
instrument # instrument
) # )
) as (first, chan): # ) as (first, chan):
yield chan # yield chan
@acm # @acm
async def maybe_open_order_feed( # async def maybe_open_order_feed(
instrument: str # instrument: str
) -> trio.abc.ReceiveStream: # ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # # TODO: add a predicate to maybe_open_context
async with maybe_open_context( # async with maybe_open_context(
acm_func=open_order_feed, # acm_func=open_order_feed,
kwargs={ # kwargs={
'instrument': instrument.split('.')[0], # 'instrument': instrument.split('.')[0],
'fh': fh # 'fh': fh
}, # },
key=f'{instrument.split('.')[0]}-order', # key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed): # ) as (cache_hit, feed):
if cache_hit: # if cache_hit:
yield broadcast_receiver(feed, 10) # yield broadcast_receiver(feed, 10)
else: # else:
yield feed # yield feed

View File

@ -18,56 +18,58 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Callable from typing import (
from pprint import pformat # Any,
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import cryptofeed
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
now,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.accounting import (
Asset,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
) )
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
DataUnavailable,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
) )
from piker.log import get_logger, get_console_log from piker.log import (
from piker.data import ShmArray get_logger,
mk_repr,
)
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client,
get_config, # get_config,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst, piker_sym_to_cb_sym,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
Pair, Pair,
OptionPair, OptionPair,
Trade,
) )
_spawn_kwargs = { _spawn_kwargs = {
@ -86,6 +88,10 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -105,6 +111,31 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData( raise NoData(
f'No frame for {start_dt} -> {end_dt}\n' f'No frame for {start_dt} -> {end_dt}\n'
) )
@ -126,14 +157,20 @@ async def open_history_client(
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield (
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache() @async_lifo_cache()
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[MktPair, Pair] | None: ) -> tuple[MktPair, Pair|OptionPair] | None:
# uppercase since kraken bs_mktid is always upper # uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower(): if 'deribit' not in fqme.lower():
@ -149,7 +186,7 @@ async def get_mkt_info(
# returns, always! # returns, always!
expiry: str = expiry.upper() expiry: str = expiry.upper()
venue: str = venue.upper() venue: str = venue.upper()
venue_lower: str = venue.lower() # venue_lower: str = venue.lower()
mkt_mode: str = 'option' mkt_mode: str = 'option'
@ -175,64 +212,88 @@ async def get_mkt_info(
price_tick=pair.price_tick, price_tick=pair.price_tick,
size_tick=pair.size_tick, size_tick=pair.size_tick,
bs_mktid=pair.symbol, bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode, venue=mkt_mode,
broker='deribit', broker='deribit',
_atype=mkt_mode, _atype=mkt_mode,
_fqme_without_src=True, _fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
) )
return mkt, pair return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging '''
get_console_log(loglevel or tractor.current_actor().loglevel) Open a live quote stream for the market set defined by `symbols`.
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = [] init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(
mkt_info=mkt,
)
) )
nsym = piker_sym_to_cb_sym(sym) # build `cryptofeed` feed-handle
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream: from_cf: tractor.to_asyncio.LinkedTaskChannel
async with maybe_open_price_feed(sym) as from_cf:
cache = client._pairs # load the "last trades" summary
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
last_trades = (await client.last_trades( # TODO, do we even need this or will the above always
cb_sym_to_deribit_inst(nsym), count=1)).trades # work?
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
if len(last_trades) == 0: # else:
last_trade = None last_trade = Trade(
async for typ, quote in stream: **(last_trades[0])
if typ == 'trade': )
last_trade = Trade(**(quote['data']))
break
else: first_quote: dict = {
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym, 'symbol': sym,
'last': last_trade.price, 'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, 'brokerd_ts': last_trade.timestamp,
@ -243,13 +304,84 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp 'broker_ts': last_trade.timestamp
}] }]
} }
task_status.started((init_msgs, first_quote)) task_status.started((
init_msgs,
first_quote,
))
feed_is_live.set() feed_is_live.set()
async for typ, quote in stream: # NOTE XXX, static for now!
topic = quote['symbol'] # => since this only handles ONE mkt feed at a time we
await send_chan.send({topic: quote}) # don't need a lookup table to map interleaved quotes
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context @tractor.context
@ -259,13 +391,13 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = client._pairs # cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within # NOTE: pattern fuzzy-matching is done within
# the methd impl. # the methd impl.
pairs: dict[str, Pair] = await client.search_symbols( pairs: dict[str, Pair] = await client.search_symbols(

View File

@ -22,11 +22,10 @@ from __future__ import annotations
import pendulum import pendulum
from typing import ( from typing import (
Literal, Literal,
Optional,
) )
from decimal import Decimal from decimal import Decimal
from msgspec import field
from piker.types import Struct from piker.types import Struct
@ -111,18 +110,21 @@ class OptionPair(Pair, frozen=True):
block_trade_min_trade_amount: int # '25' block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003' block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair' ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property @property
def expiry(self) -> str: def expiry(self) -> str:
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat() iso_date = pendulum.from_timestamp(
self.expiration_timestamp / 1000
).isoformat()
return iso_date return iso_date
@property @property
def venue(self) -> str: def venue(self) -> str:
return 'option' return f'{self.instrument_type}_option'
@property @property
def bs_fqme(self) -> str: def bs_fqme(self) -> str:
@ -152,6 +154,7 @@ class JSONRPCResult(Struct):
error: Optional[dict] = None error: Optional[dict] = None
result: Optional[list[dict]] = None result: Optional[list[dict]] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
method: str method: str
params: dict params: dict
@ -168,6 +171,7 @@ class KLinesResult(Struct):
status: str status: str
volume: list[float] volume: list[float]
class Trade(Struct): class Trade(Struct):
iv: float iv: float
price: float price: float
@ -186,6 +190,7 @@ class Trade(Struct):
block_trade_id: Optional[str] = '', block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0, block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
has_more: bool has_more: bool

View File

@ -434,21 +434,32 @@ async def start_backfill(
# - some other unknown error (ib blocking the # - some other unknown error (ib blocking the
# history bc they don't want you seeing how they # history bc they don't want you seeing how they
# cucked all the tinas..) # cucked all the tinas..)
if dur := frame_types.get(timeframe): if (
# decrement by a frame's worth of duration and frame_types
# retry a few times. and
last_start_dt.subtract( (dur := frame_types.get(timeframe))
):
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
orig_last_start_dt = last_start_dt
last_start_dt = last_start_dt.subtract(
seconds=dur.total_seconds() seconds=dur.total_seconds()
) )
log.warning( log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n' f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n' f'Decrementing `end_dt` by {dur} and retry..\n\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n' f'orig_last_start_dt: {orig_last_start_dt}\n'
f'dur subtracted last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
continue continue
raise
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable:
log.warning( log.warning(