Compare commits

..

5 Commits

Author SHA1 Message Date
Nelson Torres ffbca17ba2 Deribit's feed fix
- `FeedInit` for init_msgs in `stream_quotes`.

- new cache is `client_pairs` so is replacing the old `client.cache_symbols`.

- `get_mkt_info` added

- `get_ohlc` fixed to comply the new ways of the feed.
2025-01-29 19:24:57 -03:00
Nelson Torres 75891fcac9 Deribit's api fix
key changes:

- Resolved the issue with the expiration dates from deribits, now we int instead of the crazy custom deribits format.

- The client now has a new  `_json_rpc_auth_wrapper` that adquires a first access token and then will refresh the access token when this expires.

- `get_assets` fixed, now  we use the public endpoint to check the availables assets, in the future probably this will change, but for now is working just fine.

- `get_mkt_pairs` added.

- `exch_info` added.

- `cache_symbols` fixed.

- Also a lot of reformat made in api.
2025-01-29 19:24:57 -03:00
Nelson Torres 3fb71e9b93 Venues
Moved from api to venues all the msgspecs structs, also added critical imports in api, feed and __init__ mods.
2025-01-29 19:24:57 -03:00
Nelson Torres 3b90f9c5e8 New deps in the `pyproject.toml`:
- `cryptofeed` 2.4.0
- `pyarrow` 17.0.0
- `poetry.lock` updated too.
2025-01-29 19:24:57 -03:00
Tyler Goodlet 91398b7b4f data._web_bs: try to raise jsonrpc errors in parent task 2025-01-29 19:16:55 -03:00
16 changed files with 1130 additions and 3712 deletions

View File

@ -1,130 +1,82 @@
with (import <nixpkgs> {}); with (import <nixpkgs> {});
with python312Packages;
let let
glibStorePath = lib.getLib glib; glibStorePath = lib.getLib glib;
zstdStorePath = lib.getLib zstd; qtpyStorePath = lib.getLib qtpy;
dbusStorePath = lib.getLib dbus; pyqt6StorePath = lib.getLib pyqt6;
libGLStorePath = lib.getLib libGL; pyqt6SipStorePath = lib.getLib pyqt6-sip;
freetypeStorePath = lib.getLib freetype;
qt6baseStorePath = lib.getLib qt6.qtbase; qt6baseStorePath = lib.getLib qt6.qtbase;
fontconfigStorePath = lib.getLib fontconfig; rapidfuzzStorePath = lib.getLib rapidfuzz;
libxkbcommonStorePath = lib.getLib libxkbcommon; qdarkstyleStorePath = lib.getLib qdarkstyle;
xcbutilcursorStorePath = lib.getLib xcb-util-cursor;
qtpyStorePath = lib.getLib python312Packages.qtpy;
pyqt6StorePath = lib.getLib python312Packages.pyqt6;
pyqt6SipStorePath = lib.getLib python312Packages.pyqt6-sip;
rapidfuzzStorePath = lib.getLib python312Packages.rapidfuzz;
qdarkstyleStorePath = lib.getLib python312Packages.qdarkstyle;
xorgLibX11StorePath = lib.getLib xorg.libX11;
xorgLibxcbStorePath = lib.getLib xorg.libxcb;
xorgxcbutilwmStorePath = lib.getLib xorg.xcbutilwm;
xorgxcbutilimageStorePath = lib.getLib xorg.xcbutilimage;
xorgxcbutilerrorsStorePath = lib.getLib xorg.xcbutilerrors;
xorgxcbutilkeysymsStorePath = lib.getLib xorg.xcbutilkeysyms;
xorgxcbutilrenderutilStorePath = lib.getLib xorg.xcbutilrenderutil;
in in
stdenv.mkDerivation { stdenv.mkDerivation {
name = "piker-qt6-uv"; name = "piker-qt6-poetry-shell";
buildInputs = [ buildInputs = [
# System requirements. # System requirements.
glib glib
dbus
zstd
libGL
freetype
qt6.qtbase qt6.qtbase
libgcc.lib libgcc.lib
fontconfig
libxkbcommon
# Xorg requirements
xcb-util-cursor
xorg.libxcb
xorg.libX11
xorg.xcbutilwm
xorg.xcbutilimage
xorg.xcbutilerrors
xorg.xcbutilkeysyms
xorg.xcbutilrenderutil
# Python requirements. # Python requirements.
python312Full python312Full
python312Packages.uv poetry-core
python312Packages.qdarkstyle qdarkstyle
python312Packages.rapidfuzz rapidfuzz
python312Packages.pyqt6 pyqt6
python312Packages.qtpy qtpy
]; ];
src = null; src = null;
shellHook = '' shellHook = ''
set -e set -e
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib
# Set the Qt plugin path # Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1 # export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}/lib"
QT_PLUGIN_PATH="$QTBASE_PATH/qt-6/plugins"
QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
LIB_GCC_PATH="${libgcc.lib}/lib" QTBASE_PATH="${qt6baseStorePath}"
GLIB_PATH="${glibStorePath}/lib" echo "qtbase path: $QTBASE_PATH"
ZSTD_PATH="${zstdStorePath}/lib" echo ""
DBUS_PATH="${dbusStorePath}/lib" export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins"
LIBGL_PATH="${libGLStorePath}/lib" export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
FREETYPE_PATH="${freetypeStorePath}/lib" echo "qt plugin path: $QT_PLUGIN_PATH"
FONTCONFIG_PATH="${fontconfigStorePath}/lib" echo ""
LIB_XKB_COMMON_PATH="${libxkbcommonStorePath}/lib"
XCB_UTIL_CURSOR_PATH="${xcbutilcursorStorePath}/lib" # Maybe create venv & install deps
XORG_LIB_X11_PATH="${xorgLibX11StorePath}/lib" poetry install --with uis
XORG_LIB_XCB_PATH="${xorgLibxcbStorePath}/lib"
XORG_XCB_UTIL_IMAGE_PATH="${xorgxcbutilimageStorePath}/lib"
XORG_XCB_UTIL_WM_PATH="${xorgxcbutilwmStorePath}/lib"
XORG_XCB_UTIL_RENDER_UTIL_PATH="${xorgxcbutilrenderutilStorePath}/lib"
XORG_XCB_UTIL_KEYSYMS_PATH="${xorgxcbutilkeysymsStorePath}/lib"
XORG_XCB_UTIL_ERRORS_PATH="${xorgxcbutilerrorsStorePath}/lib"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QTBASE_PATH" # Use pyqt6 from System, patch activate script
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_PLUGIN_PATH" ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$QT_QPA_PLATFORM_PLUGIN_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_GCC_PATH" export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$DBUS_PATH" export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$GLIB_PATH" export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$ZSTD_PATH" export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIBGL_PATH" export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FONTCONFIG_PATH" echo "rapidfuzz at: $RPDFUZZ_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$FREETYPE_PATH" echo "qdarkstyle at: $QDRKSTYLE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$LIB_XKB_COMMON_PATH" echo "qtpy at: $QTPY_PATH"
echo "pyqt6 at: $PYQT6_PATH"
echo "pyqt6-sip at: $PYQT6_SIP_PATH"
echo ""
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XCB_UTIL_CURSOR_PATH" PATCH="export PYTHONPATH=\""
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_X11_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_LIB_XCB_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_IMAGE_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_WM_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_RENDER_UTIL_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_KEYSYMS_PATH"
LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$XORG_XCB_UTIL_ERRORS_PATH"
export LD_LIBRARY_PATH PATCH="$PATCH\$RPDFUZZ_PATH"
PATCH="$PATCH:\$QDRKSTYLE_PATH"
PATCH="$PATCH:\$QTPY_PATH"
PATCH="$PATCH:\$PYQT6_PATH"
PATCH="$PATCH:\$PYQT6_SIP_PATH"
RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages" PATCH="$PATCH\""
QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
PATCH="$PATCH:$RPDFUZZ_PATH" if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then
PATCH="$PATCH:$QDRKSTYLE_PATH" echo "venv is already patched."
PATCH="$PATCH:$QTPY_PATH" else
PATCH="$PATCH:$PYQT6_PATH" echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..."
PATCH="$PATCH:$PYQT6_SIP_PATH" sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH
fi
export PATCH
# Install deps
uv lock
poetry shell
''; '';
} }

View File

@ -1,139 +0,0 @@
#!/usr/bin/env python
from decimal import (
Decimal,
)
import trio
import tractor
from datetime import datetime
from pprint import pformat
from piker.brokers.deribit.api import (
get_client,
maybe_open_oi_feed,
)
def check_if_complete(
oi: dict[str, dict[str, Decimal | None]]
) -> bool:
return all(
oi[strike]['C'] is not None
and
oi[strike]['P'] is not None for strike in oi
)
async def max_pain_daemon(
) -> None:
oi_by_strikes: dict[str, dict[str, Decimal | None]]
expiry_dates: list[str]
currency: str = 'btc'
kind: str = 'option'
async with get_client(
) as client:
expiry_dates: list[str] = await client.get_expiration_dates(
currency=currency,
kind=kind
)
print(f'Available expiration dates for {currency}-{kind}:')
print(f'{expiry_dates}')
expiry_date: str = input('Please enter a valid expiration date: ').upper()
print('Starting little daemon...')
instruments: list[Symbol] = []
oi_by_strikes: dict[str, dict[str, Decimal]]
def update_oi_by_strikes(msg: tuple):
nonlocal oi_by_strikes
if 'oi' == msg[0]:
strike_price = msg[1]['strike_price']
option_type = msg[1]['option_type']
open_interest = msg[1]['open_interest']
oi_by_strikes.setdefault(
strike_price, {}
).update(
{option_type: open_interest}
)
def get_max_pain(
oi_by_strikes: dict[str, dict[str, Decimal]]
) -> dict[str, str | Decimal]:
'''
This method requires only the strike_prices and oi for call
and puts, the closes list are the same as the strike_prices
the idea is to sum all the calls and puts cash for each strike
and the ITM strikes from that strike, the lowest value is what we
are looking for the intrinsic value.
'''
nonlocal timestamp
# We meed to find the lowest value, so we start at
# infinity to ensure that, and the max_pain must be
# an amount greater than zero.
total_intrinsic_value: Decimal = Decimal('Infinity')
max_pain: Decimal = Decimal(0)
call_cash: Decimal = Decimal(0)
put_cash: Decimal = Decimal(0)
intrinsic_values: dict[str, dict[str, Decimal]] = {}
closes: list = sorted(Decimal(close) for close in oi_by_strikes)
for strike, oi in oi_by_strikes.items():
s = Decimal(strike)
call_cash = sum(max(0, (s - c) * oi_by_strikes[str(c)]['C']) for c in closes)
put_cash = sum(max(0, (c - s) * oi_by_strikes[str(c)]['P']) for c in closes)
intrinsic_values[strike] = {
'C': call_cash,
'P': put_cash,
'total': call_cash + put_cash,
}
if intrinsic_values[strike]['total'] < total_intrinsic_value:
total_intrinsic_value = intrinsic_values[strike]['total']
max_pain = s
return {
'timestamp': timestamp,
'expiry_date': expiry_date,
'total_intrinsic_value': total_intrinsic_value,
'max_pain': max_pain,
}
async with get_client(
) as client:
instruments = await client.get_instruments(
expiry_date=expiry_date,
)
oi_by_strikes = client.get_strikes_dict(instruments)
async with maybe_open_oi_feed(
instruments,
) as oi_feed:
async for msg in oi_feed:
update_oi_by_strikes(msg)
if check_if_complete(oi_by_strikes):
if 'oi' == msg[0]:
timestamp = msg[1]['timestamp']
max_pain = get_max_pain(oi_by_strikes)
print('-----------------------------------------------')
print(f'timestamp: {datetime.fromtimestamp(max_pain['timestamp'])}')
print(f'expiry_date: {max_pain['expiry_date']}')
print(f'max_pain: {max_pain['max_pain']}')
print(f'total intrinsic value: {max_pain['total_intrinsic_value']}')
print('-----------------------------------------------')
async def main():
async with tractor.open_nursery() as n:
p: tractor.Portal = await n.start_actor(
'max_pain_daemon',
enable_modules=[__name__],
infect_asyncio=True,
)
await p.run(max_pain_daemon)
if __name__ == '__main__':
trio.run(main)

View File

@ -1,19 +0,0 @@
## Max Pain Calculation for Deribit Options
This feature, which calculates the max pain point for options traded on the Deribit exchange using cryptofeed library.
- Functions in the api module for fetching options data from Deribit. [commit](https://pikers.dev/pikers/piker/commit/da55856dd2876291f55a06eb0561438a912d8241)
- Compute the max pain point based on open interest data using deribit's api. [commit](https://pikers.dev/pikers/piker/commit/0d9d6e15ba0edeb662ec97f7599dd66af3046b94)
### How to test it?
**Before start:** in order to get this working with `uv`, you **must** use my `tractor` [fork](https://pikers.dev/ntorres/tractor/src/branch/aio_abandons) and this branch: `aio_abandons`, the reason is that I cherry-pick the `uv_migration` that guille made, for some reason that a didn't dive into, in my system y need tractor using `uv` too. quite hacky I guess.
1. `uv lock`
2. `uv run --no-dev python examples/max_pain.py`
3. A message should be display, enter one of the expiration date available.
4. The script should be up and running.

View File

@ -28,8 +28,6 @@ from decimal import (
Decimal, Decimal,
) )
from functools import partial from functools import partial
from pathlib import Path
from pprint import pformat
import time import time
from typing import ( from typing import (
Any, Any,
@ -39,6 +37,8 @@ from typing import (
from pendulum import now from pendulum import now
import trio import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
@ -52,16 +52,11 @@ from cryptofeed import FeedHandler
from cryptofeed.defines import ( from cryptofeed.defines import (
DERIBIT, DERIBIT,
L1_BOOK, TRADES, L1_BOOK, TRADES,
OPTION, CALL, PUT, OPTION, CALL, PUT
OPEN_INTEREST,
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
from cryptofeed.types import ( # types for managing the cb callbacks.
L1Book, # from cryptofeed.types import L1Book
Trade,
OpenInterest,
)
from piker.brokers import SymbolNotFound
from .venues import ( from .venues import (
_ws_url, _ws_url,
MarketType, MarketType,
@ -69,7 +64,9 @@ from .venues import (
Pair, Pair,
OptionPair, OptionPair,
JSONRPCResult, JSONRPCResult,
JSONRPCChannel,
KLinesResult, KLinesResult,
Trade,
LastTradesResult, LastTradesResult,
) )
from piker.accounting import ( from piker.accounting import (
@ -80,7 +77,7 @@ from piker.accounting import (
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
# Struct, Struct,
) )
from piker.data._web_bs import ( from piker.data._web_bs import (
open_jsonrpc_session open_jsonrpc_session
@ -99,21 +96,9 @@ _spawn_kwargs = {
} }
def deribit_timestamp(when: datetime) -> int: # convert datetime obj timestamp to unixtime in milliseconds
''' def deribit_timestamp(when):
Convert conventional epoch timestamp, in secs, to unixtime in return int((when.timestamp() * 1000) + (when.microsecond / 1000))
milliseconds.
'''
return int(
(when.timestamp() * 1000)
+
(when.microsecond / 1000)
)
def get_timestamp_int(expiry_date: str) -> int:
return int(time.mktime(time.strptime(expiry_date, '%d%b%y')))
def str_to_cb_sym(name: str) -> Symbol: def str_to_cb_sym(name: str) -> Symbol:
@ -123,39 +108,31 @@ def str_to_cb_sym(name: str) -> Symbol:
if option_type == 'put': if option_type == 'put':
option_type = PUT option_type = PUT
elif option_type == 'call': elif option_type == 'call':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
new_expiry_date: int = get_timestamp_int( new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
get_values_from_cb_normalized_date(expiry_date)
)
return Symbol( return Symbol(
base=base, base=base,
quote=quote, quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date expiry_date=new_expiry_date)
)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
( base, expiry_date, strike_price, option_type = tuple(
base,
expiry_date,
strike_price,
option_type,
)= tuple(
name.upper().split('-')) name.upper().split('-'))
new_expiry_date = get_timestamp_int(expiry_date) quote = base
quote: str = base
if option_type == 'P' or option_type == 'PUT': if option_type == 'P':
option_type = PUT option_type = PUT
elif option_type == 'C' or option_type == 'CALL': elif option_type == 'C':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
@ -166,32 +143,14 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date expiry_date=expiry_date)
)
# TODO, instead can't we just lookup the `MktPair` directly def cb_sym_to_deribit_inst(sym: Symbol):
# and pass it upward to `stream_quotes()`??
def cb_sym_to_deribit_inst(sym: Symbol) -> str:
'''
Generate our own internal `str`-repr for a `cryptofeed.Symbol`
uniquely from its fields.
This is the equiv of generating a `Pair.fmqe` from `cryptofeed`
for now i suppose..?
'''
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = ( otype = 'C' if sym.option_type == CALL else 'P'
'C' if sym.option_type == CALL
else 'P' return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
)
return (
f'{sym.base}-'
f'{new_expiry_date}-'
f'{sym.strike_price}-'
f'{otype}'
)
def get_values_from_cb_normalized_date(expiry_date: str) -> str: def get_values_from_cb_normalized_date(expiry_date: str) -> str:
@ -220,39 +179,32 @@ def get_config() -> dict[str, Any]:
conf: dict conf: dict
path: Path path: Path
conf, path = config.load( conf, path = config.load(
conf_name='brokers', conf_name='brokers',
touch_if_dne=True, touch_if_dne=True,
) )
section: dict|None = conf.get('deribit') section: dict = {}
section = conf.get('deribit')
if section is None: if section is None:
raise ValueError( log.warning(f'No config section found for deribit in {path}')
f'No `[deribit]` section found in\n' return {}
f'{path!r}\n\n'
f'See the template config from the core repo for samples..\n'
# f'<TODO put repo link here??>'
)
conf_option = section.get('option', {}) conf_option = section.get('option', {})
conf_log = conf_option.get('log', {}) section.clear # clear the dict to reuse it
return { section['deribit'] = {}
'deribit': { section['deribit']['key_id'] = conf_option.get('api_key')
'key_id': conf_option['key_id'], section['deribit']['key_secret'] = conf_option.get('api_secret')
'key_secret': conf_option['key_secret'],
}, section['log'] = {}
'log': { section['log']['filename'] = 'feedhandler.log'
'filename': conf_log['filename'], section['log']['level'] = 'DEBUG'
'level': conf_log['level'],
'disabled': conf_log['disabled'], return section
}
}
class Client: class Client:
'''
Hi-level interface for the jsron-RPC over websocket API.
'''
def __init__( def __init__(
self, self,
@ -271,12 +223,8 @@ class Client:
self._auth_ts = None self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth self._auth_renew_ts = 5 # seconds to renew auth
async def _json_rpc_auth_wrapper( async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
self,
*args,
**kwargs,
) -> JSONRPCResult:
"""Background task that adquires a first access token and then will """Background task that adquires a first access token and then will
refresh the access token. refresh the access token.
@ -302,6 +250,9 @@ class Client:
return await self.json_rpc(*args, **kwargs) return await self.json_rpc(*args, **kwargs)
async def get_balances( async def get_balances(
self, self,
kind: str = 'option' kind: str = 'option'
@ -321,44 +272,28 @@ class Client:
return balances return balances
async def get_currencies(
self,
) -> list[dict]:
'''
Return the set of currencies for deribit.
'''
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
return resp.result
async def get_assets( async def get_assets(
self, self,
venue: str | None = None, venue: str | None = None,
) -> dict[str, Asset]: ) -> dict[str, Asset]:
''' """Return the set of asset balances for this account
Return the set of asset balances for this account by symbol.
by (deribit's) symbol. """
'''
assets = {} assets = {}
currencies = await self.get_currencies() resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies = resp.result
for currency in currencies: for currency in currencies:
name: str = currency['currency'] name = currency['currency']
tx_tick: Decimal = digits_to_dec(currency['fee_precision']) tx_tick = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
# TODO, handling of options, futures, perps etc. more
# specifically with diff `.atype`s?
assets[name] = Asset( assets[name] = Asset(
name=name, name=name,
atype='crypto_currency', atype=atype,
tx_tick=tx_tick, tx_tick=tx_tick)
)
instruments = await self.symbol_info(currency=name) instruments = await self.symbol_info(currency=name)
for instrument in instruments: for instrument in instruments:
@ -366,10 +301,9 @@ class Client:
assets[pair.symbol] = Asset( assets[pair.symbol] = Asset(
name=pair.symbol, name=pair.symbol,
atype=pair.venue, atype=pair.venue,
tx_tick=pair.size_tick, tx_tick=pair.size_tick)
)
return assets return assets
async def get_mkt_pairs(self) -> dict[str, Pair]: async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {} flat: dict[str, Pair] = {}
@ -379,82 +313,6 @@ class Client:
return flat return flat
async def get_instruments(
self,
currency: str = 'btc',
kind: str = 'option',
expired: bool = False,
expiry_date: str = None,
) -> list[Symbol]:
"""
Get instruments for cryptoFeed.FeedHandler.
"""
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
'expired': expired,
}
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_instruments',
params,
)
resp = r.result
response_list = []
for i in range(len(resp)):
element = resp[i]
name = f'{element["instrument_name"].split("-")[1]}'
if not expiry_date or name == expiry_date.upper():
response_list.append(piker_sym_to_cb_sym(element['instrument_name']))
return response_list
async def get_expiration_dates(
self,
currency: str = 'btc',
kind: str = 'option',
) -> list[str]:
"""
Get a dict with all expiration dates listed as value and currency as key.
"""
params: dict[str, str] = {
'currency': currency.upper(),
'kind': kind,
}
r: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_expirations',
params,
)
resp = r.result
return resp[currency][kind]
def get_strikes_dict(
self,
instruments: list[Symbol],
) -> dict[str, dict[str, Decimal | None]]:
"""
Get a dict with strike prices as keys.
"""
response: dict[str, dict[str, Decimal | None]] = {}
for i in range(len(instruments)):
element = instruments[i]
strike = f'{str(element).split('-')[1]}'
response[f'{strike}'] = {
'C': None,
'P': None,
}
return response
async def submit_limit( async def submit_limit(
self, self,
symbol: str, symbol: str,
@ -500,19 +358,6 @@ class Client:
return cached_pair return cached_pair
if sym: if sym:
opt: OptionPair|None = pair_table.get(sym)
if not opt:
closest_matches: dict[str, Pair] = match_from_pairs(
pairs=pair_table,
query=sym,
score_cutoff=40,
)
closest_syms: list[str] = list(closest_matches.keys())
raise ValueError(
f'No contract found for {sym!r}\n\n'
f'Closest {len(closest_syms)} available contracts:\n\n'
f'{pformat(closest_syms)}\n'
)
return pair_table[sym] return pair_table[sym]
else: else:
return self._pairs return self._pairs
@ -536,7 +381,7 @@ class Client:
params: dict[str, str] = { params: dict[str, str] = {
'currency': currency.upper(), 'currency': currency.upper(),
'kind': kind, 'kind': kind,
'expired': expired, 'expired': str(expired).lower()
} }
resp: JSONRPCResult = await self._json_rpc_auth_wrapper( resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
@ -544,9 +389,9 @@ class Client:
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Pair = PAIRTYPES[kind] pair_type: Type = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, Pair] = {} instruments: dict[str, Pair] = {}
for item in results: for item in results:
symbol=item['instrument_name'].lower() symbol=item['instrument_name'].lower()
@ -582,15 +427,12 @@ class Client:
mkt_pairs = await self.symbol_info() mkt_pairs = await self.symbol_info()
if not mkt_pairs: if not mkt_pairs:
raise SymbolNotFound( raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
f'No market pairs found!?:\n'
f'{mkt_pairs}'
)
pairs_view_subtable: dict[str, Pair] = {} pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs: for instrument in mkt_pairs:
pair_type: Pair|OptionPair = PAIRTYPES[venue] pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
@ -638,14 +480,12 @@ class Client:
if end_dt is None: if end_dt is None:
end_dt = now('UTC') end_dt = now('UTC')
_orig_start_dt = start_dt
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
'minute' 'minute').subtract(minutes=limit)
).subtract(minutes=limit)
start_time: int = deribit_timestamp(start_dt) start_time = deribit_timestamp(start_dt)
end_time: int = deribit_timestamp(end_dt) end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self._json_rpc_auth_wrapper( resp = await self._json_rpc_auth_wrapper(
@ -659,13 +499,9 @@ class Client:
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars: list[tuple] = [] new_bars: list[tuple] = []
# if _orig_start_dt is None:
# if not new_bars:
# import tractor
# await tractor.pause()
for i in range(len(result.close)): for i in range(len(result.close)):
row = [
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
@ -718,7 +554,7 @@ async def get_client(
@acm @acm
async def open_feed_handler() -> FeedHandler: async def open_feed_handler():
fh = FeedHandler(config=get_config()) fh = FeedHandler(config=get_config())
yield fh yield fh
await to_asyncio.run_task(fh.stop_async) await to_asyncio.run_task(fh.stop_async)
@ -739,37 +575,43 @@ async def aio_price_feed_relay(
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
''' async def _trade(data: dict, receipt_timestamp):
Relay price feed quotes from the `cryptofeed.FeedHandler` to to_trio.send_nowait(('trade', {
the `piker`-side `trio.task` consumers for delivery to consumer 'symbol': cb_sym_to_deribit_inst(
sub-actors for various subsystems. str_to_cb_sym(data.symbol)).lower(),
'last': data,
''' 'broker_ts': time.time(),
async def _trade( 'data': data.to_dict(),
trade: Trade, # cryptofeed, NOT ours from `.venues`! 'receipt': receipt_timestamp
receipt_timestamp: int, }))
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
'''
to_trio.send_nowait(('trade', trade))
async def _l1(
book: L1Book,
receipt_timestamp: int,
) -> None:
'''
Relay-thru "l1 book" updates.
'''
to_trio.send_nowait(('l1', book))
# TODO, make this work!
# -[ ] why isn't this working in `tractor.pause_from_sync()`??
# breakpoint()
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
}))
sym: Symbol = piker_sym_to_cb_sym(instrument) sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
@ -783,35 +625,27 @@ async def aio_price_feed_relay(
if not fh.running: if not fh.running:
fh.run( fh.run(
start_loop=False, start_loop=False,
install_signal_handlers=False install_signal_handlers=False)
)
# sync with trio # sync with trio
to_trio.send_nowait(None) to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@acm @acm
async def open_price_feed( async def open_price_feed(
instrument: str instrument: str
) -> to_asyncio.LinkedTaskChannel: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
fh: FeedHandler async with to_asyncio.open_channel_from(
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial( partial(
aio_price_feed_relay, aio_price_feed_relay,
fh, fh,
instrument instrument
) )
) as (first, chan) ) as (first, chan):
): yield chan
yield chan
@acm @acm
@ -820,7 +654,6 @@ async def maybe_open_price_feed(
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={
@ -834,180 +667,69 @@ async def maybe_open_price_feed(
yield feed yield feed
async def aio_open_interest_feed_relay(
async def aio_order_feed_relay(
fh: FeedHandler, fh: FeedHandler,
instruments: list[Symbol], instrument: Symbol,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
async def _trade( async def _fill(data: dict, receipt_timestamp):
trade: Trade, # cryptofeed, NOT ours from `.venues`! breakpoint()
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side.
''' async def _order_info(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', trade)) breakpoint()
# trade and oi are user defined functions that
# will be called when trade and open interest updates are received
# data type is not dict, is an object: cryptofeed.types.OpenINterest
async def _oi(
oi: OpenInterest,
receipt_timestamp: int,
) -> None:
'''
Proxy-thru `cryptofeed.FeedHandler` "oi" to `piker`-side.
'''
symbol: Symbol = str_to_cb_sym(oi.symbol)
piker_sym: str = cb_sym_to_deribit_inst(symbol)
(
base,
expiry_date,
strike_price,
option_type
) = tuple(
piker_sym.split('-')
)
msg = {
'timestamp': oi.timestamp,
'strike_price': strike_price,
'option_type': option_type,
'open_interest': Decimal(oi.open_interest),
}
to_trio.send_nowait(('oi', msg))
channels = [TRADES, OPEN_INTEREST]
callbacks={TRADES: _trade, OPEN_INTEREST: _oi}
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
channels=channels, channels=[FILLS, ORDER_INFO],
symbols=instruments, symbols=[instrument.upper()],
callbacks=callbacks callbacks={
) FILLS: _fill,
ORDER_INFO: _order_info,
})
if not fh.running: if not fh.running:
fh.run( fh.run(
start_loop=False, start_loop=False,
install_signal_handlers=False install_signal_handlers=False)
)
# sync with trio # sync with trio
to_trio.send_nowait(None) to_trio.send_nowait(None)
# run until cancelled
await asyncio.sleep(float('inf')) await asyncio.sleep(float('inf'))
@acm @acm
async def open_oi_feed( async def open_order_feed(
instruments: list[Symbol], instrument: list[str]
) -> to_asyncio.LinkedTaskChannel: ) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
fh: FeedHandler async with to_asyncio.open_channel_from(
first: None
chan: to_asyncio.LinkedTaskChannel
async with (
maybe_open_feed_handler() as fh,
to_asyncio.open_channel_from(
partial( partial(
aio_open_interest_feed_relay, aio_order_feed_relay,
fh, fh,
instruments, instrument
) )
) as (first, chan) ) as (first, chan):
): yield chan
yield chan
@acm @acm
async def maybe_open_oi_feed( async def maybe_open_order_feed(
instruments: list[Symbol], instrument: str
) -> trio.abc.ReceiveStream: ) -> trio.abc.ReceiveStream:
# TODO: add a predicate to maybe_open_context # TODO: add a predicate to maybe_open_context
feed: to_asyncio.LinkedTaskChannel
async with maybe_open_context( async with maybe_open_context(
acm_func=open_oi_feed, acm_func=open_order_feed,
kwargs={ kwargs={
'instruments': instruments 'instrument': instrument.split('.')[0],
'fh': fh
}, },
key=f'{instruments[0].base}', key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)
else: else:
yield feed yield feed
# TODO, move all to `.broker` submod!
# async def aio_order_feed_relay(
# fh: FeedHandler,
# instrument: Symbol,
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
# ) -> None:
# async def _fill(data: dict, receipt_timestamp):
# breakpoint()
# async def _order_info(data: dict, receipt_timestamp):
# breakpoint()
# fh.add_feed(
# DERIBIT,
# channels=[FILLS, ORDER_INFO],
# symbols=[instrument.upper()],
# callbacks={
# FILLS: _fill,
# ORDER_INFO: _order_info,
# })
# if not fh.running:
# fh.run(
# start_loop=False,
# install_signal_handlers=False)
# # sync with trio
# to_trio.send_nowait(None)
# await asyncio.sleep(float('inf'))
# @acm
# async def open_order_feed(
# instrument: list[str]
# ) -> trio.abc.ReceiveStream:
# async with maybe_open_feed_handler() as fh:
# async with to_asyncio.open_channel_from(
# partial(
# aio_order_feed_relay,
# fh,
# instrument
# )
# ) as (first, chan):
# yield chan
# @acm
# async def maybe_open_order_feed(
# instrument: str
# ) -> trio.abc.ReceiveStream:
# # TODO: add a predicate to maybe_open_context
# async with maybe_open_context(
# acm_func=open_order_feed,
# kwargs={
# 'instrument': instrument.split('.')[0],
# 'fh': fh
# },
# key=f'{instrument.split('.')[0]}-order',
# ) as (cache_hit, feed):
# if cache_hit:
# yield broadcast_receiver(feed, 10)
# else:
# yield feed

View File

@ -18,58 +18,56 @@
Deribit backend. Deribit backend.
''' '''
from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import ( from typing import Any, Optional, Callable
# Any, from pprint import pformat
# Optional,
Callable,
)
# from pprint import pformat
import time import time
import cryptofeed
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( from pendulum import (
from_timestamp, from_timestamp,
now,
) )
from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.accounting import (
Asset,
MktPair, MktPair,
unpack_fqme, unpack_fqme,
) )
from piker.brokers import ( from piker.brokers import (
open_cached_client, open_cached_client,
NoData, NoData,
DataUnavailable,
) )
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
) )
from piker.log import ( from piker.log import get_logger, get_console_log
get_logger, from piker.data import ShmArray
mk_repr,
)
from piker.data.validate import FeedInit from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
)
from cryptofeed import FeedHandler
from cryptofeed.defines import (
DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Client, Trade,
# get_config, get_config,
piker_sym_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
cb_sym_to_deribit_inst,
str_to_cb_sym,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import ( from .venues import (
Pair, Pair,
OptionPair, OptionPair,
Trade,
) )
_spawn_kwargs = { _spawn_kwargs = {
@ -88,10 +86,6 @@ async def open_history_client(
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
pair: OptionPair = client._pairs[mkt.dst.name]
# XXX NOTE, the cuckers use ms !!!
creation_time_s: int = pair.creation_timestamp/1000
async def get_ohlc( async def get_ohlc(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime | None = None,
@ -111,31 +105,6 @@ async def open_history_client(
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
if (
end_dt is None
):
raise DataUnavailable(
'No history seems to exist yet?\n\n'
f'{mkt}'
)
elif (
end_dt
and
end_dt.timestamp() < creation_time_s
):
# the contract can't have history
# before it was created.
pair_type_str: str = type(pair).__name__
create_dt: datetime = from_timestamp(creation_time_s)
raise DataUnavailable(
f'No history prior to\n'
f'`{pair_type_str}.creation_timestamp: int = '
f'{pair.creation_timestamp}\n\n'
f'------ deribit sux ------\n'
f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n'
f'creation_time_s: {creation_time_s}\n'
f'create_dt: {create_dt}\n'
)
raise NoData( raise NoData(
f'No frame for {start_dt} -> {end_dt}\n' f'No frame for {start_dt} -> {end_dt}\n'
) )
@ -157,20 +126,14 @@ async def open_history_client(
return array, start_dt, end_dt return array, start_dt, end_dt
yield ( yield get_ohlc, {'erlangs': 3, 'rate': 3}
get_ohlc,
{ # backfill config
'erlangs': 3,
'rate': 3,
}
)
@async_lifo_cache() @async_lifo_cache()
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[MktPair, Pair|OptionPair] | None: ) -> tuple[MktPair, Pair] | None:
# uppercase since kraken bs_mktid is always upper # uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower(): if 'deribit' not in fqme.lower():
@ -186,7 +149,7 @@ async def get_mkt_info(
# returns, always! # returns, always!
expiry: str = expiry.upper() expiry: str = expiry.upper()
venue: str = venue.upper() venue: str = venue.upper()
# venue_lower: str = venue.lower() venue_lower: str = venue.lower()
mkt_mode: str = 'option' mkt_mode: str = 'option'
@ -212,88 +175,64 @@ async def get_mkt_info(
price_tick=pair.price_tick, price_tick=pair.price_tick,
size_tick=pair.size_tick, size_tick=pair.size_tick,
bs_mktid=pair.symbol, bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode, venue=mkt_mode,
broker='deribit', broker='deribit',
_atype=mkt_mode, _atype=mkt_mode,
_fqme_without_src=True, _fqme_without_src=True,
# expiry=pair.expiry,
# XXX TODO, currently we don't use it since it's
# already "described" in the `OptionPair.symbol: str`
# and if we slap in the ISO repr it's kinda hideous..
# -[ ] figure out the best either std
) )
return mkt, pair return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
symbols: list[str], symbols: list[str],
feed_is_live: trio.Event, feed_is_live: trio.Event,
loglevel: str = None,
# startup sync # startup sync
task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
''' # XXX: required to propagate ``tractor`` loglevel to piker logging
Open a live quote stream for the market set defined by `symbols`. get_console_log(loglevel or tractor.current_actor().loglevel)
Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side
task and relays through L1 and `Trade` msgs here to our `trio.Task`.
'''
sym = symbols[0].split('.')[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
# multiline nested `dict` formatter (since rn quote-msgs are init_msgs: list[FeedInit] = []
# just that).
pfmt: Callable[[str], str] = mk_repr(
# so we can see `deribit`'s delightfully mega-long bs fields..
maxstring=100,
)
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt: MktPair
pair: Pair
mkt, pair = await get_mkt_info(sym) mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec # build out init msgs according to latest spec
init_msgs.append( init_msgs.append(
FeedInit( FeedInit(mkt_info=mkt)
mkt_info=mkt,
)
) )
# build `cryptofeed` feed-handle nsym = piker_sym_to_cb_sym(sym)
cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym)
from_cf: tractor.to_asyncio.LinkedTaskChannel async with maybe_open_price_feed(sym) as stream:
async with maybe_open_price_feed(sym) as from_cf:
# load the "last trades" summary cache = client._pairs
last_trades_res: cryptofeed.LastTradesResult = await client.last_trades(
cb_sym_to_deribit_inst(cf_sym),
count=1,
)
last_trades: list[Trade] = last_trades_res.trades
# TODO, do we even need this or will the above always last_trades = (await client.last_trades(
# work? cb_sym_to_deribit_inst(nsym), count=1)).trades
# if not last_trades:
# await tractor.pause()
# async for typ, quote in from_cf:
# if typ == 'trade':
# last_trade = Trade(**(quote['data']))
# break
# else: if len(last_trades) == 0:
last_trade = Trade( last_trade = None
**(last_trades[0]) async for typ, quote in stream:
) if typ == 'trade':
last_trade = Trade(**(quote['data']))
break
first_quote: dict = { else:
last_trade = Trade(**(last_trades[0]))
first_quote = {
'symbol': sym, 'symbol': sym,
'last': last_trade.price, 'last': last_trade.price,
'brokerd_ts': last_trade.timestamp, 'brokerd_ts': last_trade.timestamp,
@ -304,84 +243,13 @@ async def stream_quotes(
'broker_ts': last_trade.timestamp 'broker_ts': last_trade.timestamp
}] }]
} }
task_status.started(( task_status.started((init_msgs, first_quote))
init_msgs,
first_quote,
))
feed_is_live.set() feed_is_live.set()
# NOTE XXX, static for now! async for typ, quote in stream:
# => since this only handles ONE mkt feed at a time we topic = quote['symbol']
# don't need a lookup table to map interleaved quotes await send_chan.send({topic: quote})
# from multiple possible mkt-pairs
topic: str = mkt.bs_fqme
# deliver until cancelled
async for typ, ref in from_cf:
match typ:
case 'trade':
trade: cryptofeed.types.Trade = ref
# TODO, re-impl this according to teh ideal
# fqme for opts that we choose!!
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(trade.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'last': trade.price,
'broker_ts': time.time(),
# ^TODO, name this `brokerd/datad_ts` and
# use `time.time_ns()` ??
'ticks': [{
'type': 'trade',
'price': float(trade.price),
'size': float(trade.amount),
'broker_ts': trade.timestamp,
}],
}
log.info(
f'deribit {typ!r} quote for {sym!r}\n\n'
f'{trade}\n\n'
f'{pfmt(piker_quote)}\n'
)
case 'l1':
book: cryptofeed.types.L1Book = ref
# TODO, so this is where we can possibly change things
# and instead lever the `MktPair.bs_fqme: str` output?
bs_fqme: str = cb_sym_to_deribit_inst(
str_to_cb_sym(book.symbol)
).lower()
piker_quote: dict = {
'symbol': bs_fqme,
'ticks': [
{'type': 'bid',
'price': float(book.bid_price),
'size': float(book.bid_size)},
{'type': 'bsize',
'price': float(book.bid_price),
'size': float(book.bid_size),},
{'type': 'ask',
'price': float(book.ask_price),
'size': float(book.ask_size),},
{'type': 'asize',
'price': float(book.ask_price),
'size': float(book.ask_size),}
]
}
await send_chan.send({
topic: piker_quote,
})
@tractor.context @tractor.context
@ -391,13 +259,13 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
# cache = client._pairs cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str pattern: str
async for pattern in stream: async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within # NOTE: pattern fuzzy-matching is done within
# the methd impl. # the methd impl.
pairs: dict[str, Pair] = await client.search_symbols( pairs: dict[str, Pair] = await client.search_symbols(

View File

@ -22,10 +22,11 @@ from __future__ import annotations
import pendulum import pendulum
from typing import ( from typing import (
Literal, Literal,
Optional,
) )
from decimal import Decimal from decimal import Decimal
from msgspec import field
from piker.types import Struct from piker.types import Struct
@ -110,21 +111,18 @@ class OptionPair(Pair, frozen=True):
block_trade_min_trade_amount: int # '25' block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003' block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair' ns_path: str = 'piker.brokers.deribit:OptionPair'
# TODO, impl this without the MM:SS part of
# the `'THH:MM:SS..'` etc..
@property @property
def expiry(self) -> str: def expiry(self) -> str:
iso_date = pendulum.from_timestamp( iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
self.expiration_timestamp / 1000
).isoformat()
return iso_date return iso_date
@property @property
def venue(self) -> str: def venue(self) -> str:
return f'{self.instrument_type}_option' return 'option'
@property @property
def bs_fqme(self) -> str: def bs_fqme(self) -> str:
@ -154,7 +152,6 @@ class JSONRPCResult(Struct):
error: Optional[dict] = None error: Optional[dict] = None
result: Optional[list[dict]] = None result: Optional[list[dict]] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
method: str method: str
params: dict params: dict
@ -171,7 +168,6 @@ class KLinesResult(Struct):
status: str status: str
volume: list[float] volume: list[float]
class Trade(Struct): class Trade(Struct):
iv: float iv: float
price: float price: float
@ -190,7 +186,6 @@ class Trade(Struct):
block_trade_id: Optional[str] = '', block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0, block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
has_more: bool has_more: bool

View File

@ -30,7 +30,6 @@ import time
from typing import ( from typing import (
Any, Any,
AsyncIterator, AsyncIterator,
Callable,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -55,9 +54,6 @@ from ._util import (
get_console_log, get_console_log,
) )
from ..service import maybe_spawn_daemon from ..service import maybe_spawn_daemon
from piker.log import (
mk_repr,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
@ -579,6 +575,7 @@ async def open_sample_stream(
async def sample_and_broadcast( async def sample_and_broadcast(
bus: _FeedsBus, # noqa bus: _FeedsBus, # noqa
rt_shm: ShmArray, rt_shm: ShmArray,
hist_shm: ShmArray, hist_shm: ShmArray,
@ -599,22 +596,11 @@ async def sample_and_broadcast(
overruns = Counter() overruns = Counter()
# multiline nested `dict` formatter (since rn quote-msgs are
# just that).
pfmt: Callable[[str], str] = mk_repr()
# iterate stream delivered by broker # iterate stream delivered by broker
async for quotes in quote_stream: async for quotes in quote_stream:
# print(quotes)
# XXX WARNING XXX only enable for debugging bc ow can cost # TODO: ``numba`` this!
# ALOT of perf with HF-feedz!!!
#
# log.info(
# 'Rx live quotes:\n'
# f'{pfmt(quotes)}'
# )
# TODO: `numba` this!
for broker_symbol, quote in quotes.items(): for broker_symbol, quote in quotes.items():
# TODO: in theory you can send the IPC msg *before* writing # TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that # to the sharedmem array to decrease latency, however, that
@ -687,18 +673,6 @@ async def sample_and_broadcast(
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: set[Sub] = bus.get_subs(sub_key)
if not subs:
all_bs_fqmes: list[str] = list(
bus._subscribers.keys()
)
log.warning(
f'No subscribers for {brokername!r} live-quote ??\n'
f'broker_symbol: {broker_symbol}\n\n'
f'Maybe the backend-sys symbol does not match one of,\n'
f'{pfmt(all_bs_fqmes)}\n'
)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
# should?) so we have to manually generate the correct # should?) so we have to manually generate the correct

View File

@ -360,7 +360,7 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing JSONRPC response-request style machinery for transparent multiplexing
of msgs over a `NoBsWs`. of msgs over a NoBsWs.
''' '''
@ -377,16 +377,6 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
msg_recv_timeout: float = float('inf'),
# ^NOTE, since only `deribit` is using this jsonrpc stuff atm
# and options mkts are generally "slow moving"..
#
# FURTHER if we break the underlying ws connection then since we
# don't pass a `fixture` to the task that manages `NoBsWs`, i.e.
# `_reconnect_forever()`, the jsonrpc "transport pipe" get's
# broken and never restored with wtv init sequence is required to
# re-establish a working req-resp session.
# request_type: Optional[type] = None, # request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None, # request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None, # error_hook: Optional[Callable] = None,
@ -398,18 +388,12 @@ async def open_jsonrpc_session(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws( open_autorecon_ws(url) as ws
url=url,
msg_recv_timeout=msg_recv_timeout,
) as ws
): ):
rpc_id: Iterable[int] = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc( async def json_rpc(method: str, params: dict) -> dict:
method: str,
params: dict,
) -> dict:
''' '''
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
@ -499,7 +483,7 @@ async def open_jsonrpc_session(
# response in original "result" msg, # response in original "result" msg,
# THEN FINALLY set the event to signal caller # THEN FINALLY set the event to signal caller
# to raise the error in the parent task. # to raise the error in the parent task.
req_id: int = msg['id'] req_id: int = error['id']
req_msg: dict = req_msgs[req_id] req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id] result: dict = rpc_results[req_id]
result['error'] = error result['error'] = error

View File

@ -540,10 +540,7 @@ async def open_feed_bus(
# subscription since the backend isn't (yet) expected to # subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys # append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) . # which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault( bus._subscribers.setdefault(bs_fqme, set())
bs_fqme,
set(),
)
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
await ctx.started( await ctx.started(

View File

@ -18,11 +18,7 @@
Log like a forester! Log like a forester!
""" """
import logging import logging
import reprlib
import json import json
from typing import (
Callable,
)
import tractor import tractor
from pygments import ( from pygments import (
@ -88,27 +84,3 @@ def colorize_json(
# likeable styles: algol_nu, tango, monokai # likeable styles: algol_nu, tango, monokai
formatters.TerminalTrueColorFormatter(style=style) formatters.TerminalTrueColorFormatter(style=style)
) )
def mk_repr(
**repr_kws,
) -> Callable[[str], str]:
'''
Allocate and deliver a `repr.Repr` instance with provided input
settings using the std-lib's `reprlib` mod,
* https://docs.python.org/3/library/reprlib.html
------ Ex. ------
An up to 6-layer-nested `dict` as multi-line:
- https://stackoverflow.com/a/79102479
- https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel
'''
def_kws: dict[str, int] = dict(
indent=2,
maxlevel=6, # recursion levels
maxstring=66, # match editor line-len limit
)
def_kws |= repr_kws
reprr = reprlib.Repr(**def_kws)
return reprr.repr

View File

@ -161,12 +161,7 @@ class NativeStorageClient:
def index_files(self): def index_files(self):
for path in self._datadir.iterdir(): for path in self._datadir.iterdir():
if ( if path.name in {'borked', 'expired',}:
path.name in {'borked', 'expired',}
or
'.parquet' not in str(path)
):
# ignore all non-apache files (for now)
continue continue
key: str = path.name.rstrip('.parquet') key: str = path.name.rstrip('.parquet')

View File

@ -44,10 +44,8 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -216,8 +214,7 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and and end_dt < start_dt
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -265,7 +262,6 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -353,7 +349,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
def_frame_duration: Duration, frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -383,23 +379,22 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: per-provider default history-durations? # TODO: drop this right and just expose the backfill
# -[ ] inside the `open_history_client()` config allow # limits inside a [storage] section in conf.toml?
# declaring the history duration limits instead of # when no tsdb "last datum" is provided, we just load
# guessing and/or applying the same limits to all? # some near-term history.
# # periods = {
# -[ ] allow declaring (default) per-provider backfill # 1: {'days': 1},
# limits inside a [storage] sub-section in conf.toml? # 60: {'days': 14},
# # }
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently # do a decently sized backfill and load it into storage.
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend: bool = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -421,6 +416,7 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -430,58 +426,37 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt # 3 cases:
gap_report: str = ( # - frame in the middle of a legit venue gap
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' # - history actually began at the `last_start_dt`
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' # - some other unknown error (ib blocking the
f'last_start_dt: {orig_last_start_dt}\n\n' # history bc they don't want you seeing how they
f'bf_until: {backfill_until_dt}\n' # cucked all the tinas..)
) if dur := frame_types.get(timeframe):
# EMPTY FRAME signal with 3 (likely) causes: # decrement by a frame's worth of duration and
# # retry a few times.
# 1. range contains legit gap in venue history last_start_dt.subtract(
# 2. history actually (edge case) **began** at the seconds=dur.total_seconds()
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
) )
gap_report += ( log.warning(
f'Decrementing `end_dt` and retrying with,\n' f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'def_frame_duration: {def_frame_duration}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
f'(new) last_start_dt: {last_start_dt}\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
) )
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable as due: except DataUnavailable:
message: str = due.args[0]
log.warning( log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n' f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'{message}\n' f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'fqme: {mkt.fqme}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way? # UGH: what's a better way?
# TODO: backends are responsible for being correct on # TODO: backends are responsible for being correct on
@ -490,54 +465,34 @@ async def start_backfill(
# to halt the request loop until the condition is # to halt the request loop until the condition is
# resolved or should the backend be entirely in # resolved or should the backend be entirely in
# charge of solving such faults? yes, right? # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
time[0] array['time'][0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert time[-1] == next_end_dt.timestamp() diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
recv_frame_dur: Duration = ( expected_frame_size_s: float = frame_size_s + timeframe
from_timestamp(array[-1]['time']) if frame_time_diff_s > expected_frame_size_s:
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
f'{timeframe}s-series {reason} detected!\n' 'GAP DETECTED:\n'
f'fqme: {mkt.fqme}\n' f'last_start_dt: {last_start_dt}\n'
f'last_start_dt: {last_start_dt}\n\n' f'diff: {diff}\n'
f'recv interval: {recv_frame_dur}\n' f'frame_time_diff_s: {frame_time_diff_s}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -612,8 +567,7 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and and write_tsdb
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
@ -734,7 +688,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s: Duration = ( backfilled_size_s = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -957,8 +911,6 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -969,6 +921,7 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -997,25 +950,6 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -1046,7 +980,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
def_frame_duration=def_frame_size, frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

1751
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -15,8 +15,8 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
[build-system] [build-system]
requires = ["hatchling"] requires = ["poetry-core"]
build-backend = "hatchling.build" build-backend = "poetry.core.masonry.api"
# ------ - ------ # ------ - ------
@ -25,123 +25,132 @@ build-backend = "hatchling.build"
ignore = [] ignore = []
# https://docs.astral.sh/ruff/settings/#lint_per-file-ignores # https://docs.astral.sh/ruff/settings/#lint_per-file-ignores
# "piker/ui/qt.py" = [ "piker/ui/qt.py" = [
# "E402", "E402",
# 'F401', # unused imports (without __all__ or blah as blah) 'F401', # unused imports (without __all__ or blah as blah)
# # "F841", # unused variable rules # "F841", # unused variable rules
# ] ]
# ignore-init-module-imports = false # ignore-init-module-imports = false
# ------ - ------ # ------ - ------
[project] [tool.poetry]
name = "piker" name = "piker"
version = "0.1.0a0dev0" version = "0.1.0.alpha0.dev0"
description = "trading gear for hackers" description = "trading gear for hackers"
authors = [{ name = "Tyler Goodlet", email = "goodboy_foss@protonmail.com" }] authors = ["Tyler Goodlet <goodboy_foss@protonmail.com>"]
requires-python = ">=3.12, <3.13" license = "AGPLv3"
license = "AGPL-3.0-or-later"
readme = "README.rst" readme = "README.rst"
keywords = [
"async", # ------ - ------
"trading",
"finance", [tool.poetry.dependencies]
"quant", async-generator = "^1.10"
"charting", attrs = "^23.1.0"
bidict = "^0.22.1"
colorama = "^0.4.6"
colorlog = "^6.7.0"
ib-insync = "^0.9.86"
msgspec = "^0.18.6"
numba = "^0.59.0"
numpy = "^1.25"
polars = "^0.18.13"
pygments = "^2.16.1"
python = ">=3.11, <3.13"
rich = "^13.5.2"
# setuptools = "^68.0.0"
tomli = "^2.0.1"
tomli-w = "^1.0.0"
trio-util = "^0.7.0"
trio-websocket = "^0.10.3"
typer = "^0.9.0"
rapidfuzz = "^3.5.2"
pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor]
develop = true
git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'aio_abandons'
# path = "../tractor"
[tool.poetry.dependencies.asyncvnc]
git = 'https://github.com/pikers/asyncvnc.git'
branch = 'main'
[tool.poetry.dependencies.tomlkit]
develop = true
git = 'https://github.com/pikers/tomlkit.git'
branch = 'piker_pin'
# path = "../tomlkit/"
[tool.poetry.group.uis]
optional = true
[tool.poetry.group.uis.dependencies]
# https://python-poetry.org/docs/managing-dependencies/#dependency-groups
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
rapidfuzz = "^3.2.0"
qdarkstyle = ">=3.0.2"
pyqtgraph = { git = 'https://github.com/pikers/pyqtgraph.git' }
# ------ - ------
pyqt6 = "^6.7.0"
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
# testing / CI
pytest = "^6.0.0"
elasticsearch = "^8.9.0"
xonsh = "^0.14.2"
prompt-toolkit = "3.0.40"
cython = "^3.0.0"
greenback = "^1.1.1"
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
# ------ - ------
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://python-poetry.org/docs/managing-dependencies/#installing-group-dependencies
# [tool.poetry.group.daemon.dependencies]
[tool.poetry.scripts]
piker = 'piker.cli:cli'
pikerd = 'piker.cli:pikerd'
ledger = 'piker.accounting.cli:ledger'
[project]
keywords=[
"async",
"trading",
"finance",
"quant",
"charting",
] ]
classifiers = [ classifiers=[
"Development Status :: 3 - Alpha", 'Development Status :: 3 - Alpha',
"License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)", "License :: OSI Approved :: GNU Affero General Public License v3 or later (AGPLv3+)",
"Operating System :: POSIX :: Linux", 'Operating System :: POSIX :: Linux',
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.12",
"Intended Audience :: Financial and Insurance Industry", 'Intended Audience :: Financial and Insurance Industry',
"Intended Audience :: Science/Research", 'Intended Audience :: Science/Research',
"Intended Audience :: Developers", 'Intended Audience :: Developers',
"Intended Audience :: Education", 'Intended Audience :: Education',
] ]
dependencies = [
"async-generator >=1.10, <2.0.0",
"attrs >=23.1.0, <24.0.0",
"bidict >=0.22.1, <0.23.0",
"colorama >=0.4.6, <0.5.0",
"colorlog >=6.7.0, <7.0.0",
"ib-insync >=0.9.86, <0.10.0",
"numba >=0.59.0, <0.60.0",
"numpy >=1.25, <2.0",
"polars >=0.18.13, <0.19.0",
"pygments >=2.16.1, <3.0.0",
"rich >=13.5.2, <14.0.0",
"tomli >=2.0.1, <3.0.0",
"tomli-w >=1.0.0, <2.0.0",
"trio-util >=0.7.0, <0.8.0",
"trio-websocket >=0.10.3, <0.11.0",
"typer >=0.9.0, <1.0.0",
"rapidfuzz >=3.5.2, <4.0.0",
"pdbp >=1.5.0, <2.0.0",
"trio >=0.24, <0.25",
"pendulum >=3.0.0, <4.0.0",
"httpx >=0.27.0, <0.28.0",
"cryptofeed >=2.4.0, <3.0.0",
"pyarrow >=17.0.0, <18.0.0",
"websockets ==12.0",
"msgspec",
"tractor",
"asyncvnc",
"tomlkit",
]
[project.optional-dependencies]
uis = [
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# TODO: make sure the levenshtein shit compiles on nix..
# rapidfuzz = {extras = ["speedup"], version = "^0.18.0"}
"rapidfuzz >=3.2.0, <4.0.0",
"qdarkstyle >=3.0.2, <4.0.0",
"pyqt6 >=6.7.0, <7.0.0",
"pyqtgraph",
# ------ - ------
# TODO: add an `--only daemon` group for running non-ui / pikerd
# service tree in distributed mode B)
# https://docs.astral.sh/uv/concepts/projects/dependencies/#optional-dependencies
# [project.optional-dependencies]
]
[dependency-groups]
dev = [
"pytest >=6.0.0, <7.0.0",
"elasticsearch >=8.9.0, <9.0.0",
"xonsh >=0.14.2, <0.15.0",
"prompt-toolkit ==3.0.40",
"cython >=3.0.0, <4.0.0",
"greenback >=1.1.1, <2.0.0",
# console ehancements and eventually remote debugging
# extras/helpers.
# TODO: add a toolset that makes debugging a `pikerd` service
# (tree) easy to hack on directly using more or less the local env:
# - xonsh + xxh
# - rsyscall + pdbp
# - actor runtime control console like BEAM/OTP
]
[project.scripts]
piker = "piker.cli:cli"
pikerd = "piker.cli:pikerd"
ledger = "piker.accounting.cli:ledger"
[tool.hatch.build.targets.sdist]
include = ["piker"]
[tool.hatch.build.targets.wheel]
include = ["piker"]
[tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
asyncvnc = { git = "https://github.com/pikers/asyncvnc.git", branch = "main" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
msgspec = { git = "https://github.com/jcrist/msgspec.git" }
tractor = { path = "../tractor" }

1473
uv.lock

File diff suppressed because it is too large Load Diff