Compare commits

..

No commits in common. "28e8628c61db0ad4ffaf896ed8ff6cddca1b5607" and "9232d0944034dc9064570afceaa9057499505466" have entirely different histories.

4 changed files with 124 additions and 226 deletions

View File

@ -28,8 +28,6 @@ from decimal import (
Decimal, Decimal,
) )
from functools import partial from functools import partial
from pathlib import Path
from pprint import pformat
import time import time
from typing import ( from typing import (
Any, Any,
@ -39,6 +37,8 @@ from typing import (
from pendulum import now from pendulum import now
import trio import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
@ -55,10 +55,8 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks. # types for managing the cb callbacks.
# from cryptofeed.types import L1Book # from cryptofeed.types import L1Book
from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
MarketType, MarketType,
@ -66,9 +64,9 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
# JSONRPCChannel, JSONRPCChannel,
KLinesResult, KLinesResult,
# Trade, Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -79,7 +77,7 @@ from piker.accounting import (
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
# Struct, Struct,
) )
from piker.data._web_bs import ( from piker.data._web_bs import (
open_jsonrpc_session open_jsonrpc_session
@ -99,7 +97,7 @@ _spawn_kwargs = {
# convert datetime obj timestamp to unixtime in milliseconds # convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when) -> int: def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -181,18 +179,16 @@ def get_config() -> dict[str, Any]:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict|None = conf.get('deribit') section: dict = {}
section = conf.get('deribit')
if section is None: if section is None:
raise ValueError( log.warning(f'No config section found for deribit in {path}')
f'No `[deribit]` section found in\n' return {}
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {}) conf_option = section.get('option', {})
section.clear # clear the dict to reuse it section.clear # clear the dict to reuse it
@ -227,11 +223,7 @@ class Client:
self._auth_ts = None self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper( async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will """Background task that adquires a first access token and then will
refresh the access token. refresh the access token.
@ -258,6 +250,9 @@ class Client:
return await self.json_rpc(*args, **kwargs) return await self.json_rpc(*args, **kwargs)
async def get_balances( async def get_balances(
self, self,
kind: str = 'option' kind: str = 'option'
@ -282,29 +277,23 @@ class Client:
venue: str | None = None, venue: str | None = None,
) -> dict[str, Asset]: ) -> dict[str, Asset]:
''' """Return the set of asset balances for this account
Return the set of asset balances for this account by symbol.
by (deribit's) symbol. """
'''
assets = {} assets = {}
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
'public/get_currencies', 'public/get_currencies',
params={} params={}
) )
currencies: list[dict] = resp.result currencies = resp.result
for currency in currencies: for currency in currencies:
name: str = currency['currency'] name = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision']) tx_tick = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset( assets[name] = Asset(
name=name, name=name,
atype='crypto_currency', atype=atype,
tx_tick=tx_tick, tx_tick=tx_tick)
)
instruments = await self.symbol_info(currency=name) instruments = await self.symbol_info(currency=name)
for instrument in instruments: for instrument in instruments:
@ -312,8 +301,7 @@ class Client:
assets[pair.symbol] = Asset( assets[pair.symbol] = Asset(
name=pair.symbol, name=pair.symbol,
atype=pair.venue, atype=pair.venue,
tx_tick=pair.size_tick, tx_tick=pair.size_tick)
)
return assets return assets
@ -370,19 +358,6 @@ class Client:
return cached_pair return cached_pair
if sym: if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym] return pair_table[sym]
else: else:
return self._pairs return self._pairs
@ -406,7 +381,7 @@ class Client:
params: dict[str, str] = { params: dict[str, str] = {
'currency': currency.upper(), 'currency': currency.upper(),
'kind': kind, 'kind': kind,
'expired': expired, 'expired': str(expired).lower()
} }
resp: JSONRPCResult = await self._json_rpc_auth_wrapper( resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -414,7 +389,7 @@ class Client:
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Pair = PAIRTYPES[kind] pair_type: Type = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {} instruments: dict[str, Pair] = {}
@ -452,15 +427,12 @@ class Client:
mkt_pairs = await self.symbol_info() mkt_pairs = await self.symbol_info()
if not mkt_pairs: if not mkt_pairs:
raise SymbolNotFound( raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {} pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs: for instrument in mkt_pairs:
pair_type: Pair|OptionPair = PAIRTYPES[venue] pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -508,14 +480,12 @@ class Client:
if end_dt is None: if end_dt is None:
end_dt = now('UTC') end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
'minute' 'minute').subtract(minutes=limit)
).subtract(minutes=limit)
start_time: int = deribit_timestamp(start_dt) start_time = deribit_timestamp(start_dt)
end_time: int = deribit_timestamp(end_dt) end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
@ -529,12 +499,8 @@ class Client:
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars: list[tuple] = [] new_bars: list[tuple] = []
# if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
for i in range(len(result.close)): for i in range(len(result.close)):
row = [ row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
@ -702,69 +668,68 @@ async def maybe_open_price_feed(
# TODO, move all to `.broker` submod! async def aio_order_feed_relay(
# async def aio_order_feed_relay( fh: FeedHandler,
# fh: FeedHandler, instrument: Symbol,
# instrument: Symbol, from_trio: asyncio.Queue,
# from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel,
# to_trio: trio.abc.SendChannel, ) -> None:
# ) -> None: async def _fill(data: dict, receipt_timestamp):
# async def _fill(data: dict, receipt_timestamp): breakpoint()
# breakpoint()
# async def _order_info(data: dict, receipt_timestamp): async def _order_info(data: dict, receipt_timestamp):
# breakpoint() breakpoint()
# fh.add_feed( fh.add_feed(
# DERIBIT, DERIBIT,
# channels=[FILLS, ORDER_INFO], channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()], symbols=[instrument.upper()],
# callbacks={ callbacks={
# FILLS: _fill, FILLS: _fill,
# ORDER_INFO: _order_info, ORDER_INFO: _order_info,
# }) })
# if not fh.running: if not fh.running:
# fh.run( fh.run(
# start_loop=False, start_loop=False,
# install_signal_handlers=False) install_signal_handlers=False)
# # sync with trio # sync with trio
# to_trio.send_nowait(None) to_trio.send_nowait(None)
# await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
# @acm @acm
# async def open_order_feed( async def open_order_feed(
# instrument: list[str] instrument: list[str]
# ) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh: async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from( async with to_asyncio.open_channel_from(
# partial( partial(
# aio_order_feed_relay, aio_order_feed_relay,
# fh, fh,
# instrument instrument
# ) )
# ) as (first, chan): ) as (first, chan):
# yield chan yield chan
# @acm @acm
# async def maybe_open_order_feed( async def maybe_open_order_feed(
# instrument: str instrument: str
# ) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# # TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
# async with maybe_open_context( async with maybe_open_context(
# acm_func=open_order_feed, acm_func=open_order_feed,
# kwargs={ kwargs={
# 'instrument': instrument.split('.')[0], 'instrument': instrument.split('.')[0],
# 'fh': fh 'fh': fh
# }, },
# key=f'{instrument.split('.')[0]}-order', key=f'{instrument.split('.')[0]}-order',
# ) as (cache_hit, feed): ) as (cache_hit, feed):
# if cache_hit: if cache_hit:
# yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)
# else: else:
# yield feed yield feed

View File

@ -18,65 +18,56 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import ( from typing import Any, Optional, Callable
# Any, from pprint import pformat
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
now,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.accounting import (
Asset,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
) )
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
DataUnavailable,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
) )
from piker.log import ( from piker.log import get_logger, get_console_log
get_logger, from piker.data import ShmArray
)
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
# from cryptofeed import FeedHandler from cryptofeed import FeedHandler
# from cryptofeed.defines import ( from cryptofeed.defines import (
# DERIBIT, DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
# L1_BOOK, )
# TRADES, from cryptofeed.symbols import Symbol
# OPTION,
# CALL,
# PUT,
# )
# from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Client, Trade,
# get_config, get_config,
piker_sym_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
Pair, Pair,
OptionPair, OptionPair,
Trade,
) )
_spawn_kwargs = { _spawn_kwargs = {
@ -95,10 +86,6 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -118,31 +105,6 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData( raise NoData(
f'No frame for {start_dt} -> {end_dt}\n' f'No frame for {start_dt} -> {end_dt}\n'
) )
@ -164,20 +126,14 @@ async def open_history_client(
return array, start_dt, end_dt return array, start_dt, end_dt
yield ( yield get_ohlc, {'erlangs': 3, 'rate': 3}
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache() @async_lifo_cache()
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None: ) -> tuple[MktPair, Pair] | None:
# uppercase since kraken bs_mktid is always upper # uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower(): if 'deribit' not in fqme.lower():
@ -193,7 +149,7 @@ async def get_mkt_info(
# returns, always! # returns, always!
expiry: str = expiry.upper() expiry: str = expiry.upper()
venue: str = venue.upper() venue: str = venue.upper()
# venue_lower: str = venue.lower() venue_lower: str = venue.lower()
mkt_mode: str = 'option' mkt_mode: str = 'option'
@ -239,6 +195,8 @@ async def stream_quotes(
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
@ -259,8 +217,7 @@ async def stream_quotes(
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
# TODO, uhh use it ?? XD cache = client._pairs
# cache = client._pairs
last_trades = (await client.last_trades( last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades cb_sym_to_deribit_inst(nsym), count=1)).trades
@ -290,16 +247,9 @@ async def stream_quotes(
feed_is_live.set() feed_is_live.set()
# deliver until cancelled
async for typ, quote in stream: async for typ, quote in stream:
topic: str = quote['symbol'] topic = quote['symbol']
log.info( await send_chan.send({topic: quote})
f'deribit {typ!r} quote\n\n'
f'{quote}\n'
)
await send_chan.send({
topic: quote,
})
@tractor.context @tractor.context
@ -309,14 +259,13 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
# cache = client._pairs cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within # NOTE: pattern fuzzy-matching is done within
# the methd impl. # the methd impl.
pairs: dict[str, Pair] = await client.search_symbols( pairs: dict[str, Pair] = await client.search_symbols(

View File

@ -22,7 +22,6 @@ from __future__ import annotations
import pendulum import pendulum
from typing import ( from typing import (
Literal, Literal,
Optional,
) )
from decimal import Decimal from decimal import Decimal
@ -112,6 +111,7 @@ class OptionPair(Pair, frozen=True):
block_trade_min_trade_amount: int # '25' block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003' block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair' ns_path: str = 'piker.brokers.deribit:OptionPair'
@ -122,7 +122,7 @@ class OptionPair(Pair, frozen=True):
@property @property
def venue(self) -> str: def venue(self) -> str:
return f'{self.instrument_type}_option' return 'option'
@property @property
def bs_fqme(self) -> str: def bs_fqme(self) -> str:

View File

@ -360,7 +360,7 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`. of msgs over a NoBsWs.
''' '''
@ -377,16 +377,6 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
# request_type: Optional[type] = None, # request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None, # request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None, # error_hook: Optional[Callable] = None,
@ -398,18 +388,12 @@ async def open_jsonrpc_session(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws( open_autorecon_ws(url) as ws
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc( async def json_rpc(method: str, params: dict) -> dict:
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response