Compare commits

...

45 Commits

Author SHA1 Message Date
Nelson Torres c8bb8d7893 config refactor
only one get_config method for api class and cryptofeed feed handler
2025-01-29 01:03:27 -03:00
Nelson Torres b4141d4208 move constants to venue 2025-01-29 01:03:27 -03:00
Nelson Torres 468b33cf60 refactor redundant code 2025-01-29 01:03:27 -03:00
Nelson Torres 2c2fa990ed name formatting fixes 2025-01-29 01:03:27 -03:00
Nelson Torres 7bc98f5055 get_mkt_info cleanup 2025-01-29 01:03:27 -03:00
Nelson Torres 8b4bcf134c cache_symbols refactor 2025-01-29 01:03:27 -03:00
Nelson Torres 3388bc89cc json_rpc_auth_wrapper 2025-01-29 01:03:27 -03:00
Nelson Torres b84880cce5 move object classes to venue 2025-01-29 01:03:27 -03:00
Nelson Torres 7f5e655720 Added options symbols to get_assets 2025-01-29 01:03:27 -03:00
Tyler Goodlet b8ad77e861 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2025-01-29 01:03:27 -03:00
Tyler Goodlet dbafb64bc3 `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2025-01-29 01:03:27 -03:00
Nelson Torres 00fb5ceddd Deleted settlePlan field from binance FutesPair. 2025-01-29 01:03:27 -03:00
Nelson Torres e8d26f8b30 Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2025-01-29 01:03:27 -03:00
Nelson Torres fb76d1c9ed get_assets now uses public endpoint
It's better if the data is available through a public endpoint.
2025-01-29 01:03:27 -03:00
Nelson Torres 9688f4899a now using exch_info in search_symbols 2025-01-29 01:03:27 -03:00
Nelson Torres 7298374fe4 Fix bs_fqme using venue and expiry 2025-01-29 01:03:27 -03:00
Nelson Torres a7d8b5e1f4 Added expiry property for OptionPair 2025-01-29 01:03:27 -03:00
Nelson Torres 65e4bd80b2 No longer needed 2025-01-29 01:03:27 -03:00
Nelson Torres 660c580786 bs_mktid instead bs_fqme for deribits options 2025-01-29 01:03:27 -03:00
Nelson Torres a2651fd1de Fixed pair instrument name in search_symbols endpoint.
Fixed instrument in bars endpoint, for options in deribits bs_mktid instead bs_fqme.
Fixed the id is in msg.
2025-01-29 01:03:27 -03:00
Tyler Goodlet 12d557f12b data._web_bs: try to raise jsonrpc errors in parent task 2025-01-29 01:03:27 -03:00
Nelson Torres 8365b6d373 Add necessary classes in init file for deribit 2025-01-29 01:03:27 -03:00
Nelson Torres badb7342f7 Minor refactor in open_symbol_search 2025-01-29 01:03:27 -03:00
Nelson Torres b1bac1dc9f stream_quotes now using FeedInit 2025-01-29 01:03:27 -03:00
Nelson Torres be660f0275 symbol_info refactor 2025-01-29 01:03:27 -03:00
Nelson Torres 41e69af090 search_symbols output type fix 2025-01-29 01:03:27 -03:00
Nelson Torres 01e8fe18dd add get_mkt_pairs method 2025-01-29 01:03:27 -03:00
Nelson Torres b104e603db get_assets refactor 2025-01-29 01:03:27 -03:00
Nelson Torres 67424562ba formatting 2025-01-29 01:03:27 -03:00
Nelson Torres 0a92f54b59 created exch_info in api class 2025-01-29 01:03:27 -03:00
Nelson Torres bf1e2da1a7 modify self_pairs type to ChainMap 2025-01-29 01:03:27 -03:00
Nelson Torres d11e1d1ee4 Necessary imports 2025-01-29 01:03:27 -03:00
Nelson Torres cf2b507453 add get_market_info 2025-01-29 01:03:27 -03:00
Nelson Torres 61bde5a58d Necessary imports 2025-01-29 01:03:27 -03:00
Nelson Torres 43ec2e1897 minor fixes in venues 2025-01-29 01:03:27 -03:00
Nelson Torres d7669ba18b add class Pair in venues, PAIRTYPES for future uses 2025-01-29 01:03:27 -03:00
Nelson Torres 3821be082b fix syms for venues.
little refactor in get_config, and created get_fh_config for cryptofeed.
2025-01-29 01:03:27 -03:00
Nelson Torres 25d60d6bdd venues for deribit 2025-01-29 01:03:27 -03:00
Nelson Torres a9178b211d Added cryptofeed and pyarrow necessary for the feed, enable deribit
in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
2025-01-29 01:03:27 -03:00
Nelson Torres ec2203168a `default.nix` is created with required nixos deps "wiring"
`pyproject.toml` update to use pikers.dev/goodboy/tractor.git, branch: `aio_abandons`.
2025-01-29 00:34:25 -03:00
Nelson Torres 69b1bb57ea Updated tractor method name. 2025-01-29 00:17:36 -03:00
Tyler Goodlet 1f2755ee36 Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2025-01-29 00:17:36 -03:00
Tyler Goodlet ff171af168 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2025-01-29 00:17:36 -03:00
Tyler Goodlet aa4a89451a Lel, forgot to add a `SPOT` venue for `binance`.. 2025-01-29 00:17:36 -03:00
Tyler Goodlet a461fcc3e3 Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2025-01-29 00:17:36 -03:00
27 changed files with 3204 additions and 864 deletions

82
default.nix 100644
View File

@ -0,0 +1,82 @@
with (import <nixpkgs> {});
with python312Packages;
let
glibStorePath = lib.getLib glib;
qtpyStorePath = lib.getLib qtpy;
pyqt6StorePath = lib.getLib pyqt6;
pyqt6SipStorePath = lib.getLib pyqt6-sip;
qt6baseStorePath = lib.getLib qt6.qtbase;
rapidfuzzStorePath = lib.getLib rapidfuzz;
qdarkstyleStorePath = lib.getLib qdarkstyle;
in
stdenv.mkDerivation {
name = "piker-qt6-poetry-shell";
buildInputs = [
# System requirements.
glib
qt6.qtbase
libgcc.lib
# Python requirements.
python312Full
poetry-core
qdarkstyle
rapidfuzz
pyqt6
qtpy
];
src = null;
shellHook = ''
set -e
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}"
echo "qtbase path: $QTBASE_PATH"
echo ""
export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins"
export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
echo "qt plugin path: $QT_PLUGIN_PATH"
echo ""
# Maybe create venv & install deps
poetry install --with uis
# Use pyqt6 from System, patch activate script
ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate"
export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
echo "rapidfuzz at: $RPDFUZZ_PATH"
echo "qdarkstyle at: $QDRKSTYLE_PATH"
echo "qtpy at: $QTPY_PATH"
echo "pyqt6 at: $PYQT6_PATH"
echo "pyqt6-sip at: $PYQT6_SIP_PATH"
echo ""
PATCH="export PYTHONPATH=\""
PATCH="$PATCH\$RPDFUZZ_PATH"
PATCH="$PATCH:\$QDRKSTYLE_PATH"
PATCH="$PATCH:\$QTPY_PATH"
PATCH="$PATCH:\$PYQT6_PATH"
PATCH="$PATCH:\$PYQT6_SIP_PATH"
PATCH="$PATCH\""
if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then
echo "venv is already patched."
else
echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..."
sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH
fi
poetry shell
'';
}

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib', 'ib',
'kraken', 'kraken',
'kucoin', 'kucoin',
'deribit',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade # wstrade
# iex # iex
# deribit
# bitso # bitso
] ]

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -190,14 +191,17 @@ def broker_init(
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -217,27 +221,35 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
from piker.service import Services from piker.service import (
get_service_mngr,
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' ServiceMngr,
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=Services.debug_mode,
**tractor_kwargs
) )
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
# NOTE: the service mngr expects an already spawned actor + its mngr: ServiceMngr = get_service_mngr()
# portal ref in order to do non-blocking setup of brokerd ctx: tractor.Context = await mngr.start_service(
# service nursery. daemon_name=dname,
await Services.start_service_task( ctx_ep=partial(
dname,
portal,
# signature of target root-task endpoint # signature of target root-task endpoint
daemon_fixture_ep, daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername, brokername=brokername,
loglevel=loglevel, loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs
)
assert (
not ctx.cancel_called
and ctx.portal # parent side
and dname in ctx.chan.uid # subactor is named as desired
) )
return True return True
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={ spawn_args={
'brokername': brokername, 'brokername': brokername,

View File

@ -567,6 +567,7 @@ class Client:
) -> str: ) -> str:
return { return {
'USDTM': 'usdtm_futes', 'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes', # 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..? # ^-TODO-^ bc someone might want it..?
}[pair.venue] }[pair.venue]

View File

@ -181,7 +181,6 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT', quoteAsset: str # 'USDT',
quotePrecision: int # 8, quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000', requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500', triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'], underlyingSubType: list[str] # ['PoW'],

View File

@ -25,6 +25,7 @@ from .api import (
get_client, get_client,
) )
from .feed import ( from .feed import (
get_mkt_info,
open_history_client, open_history_client,
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
@ -34,15 +35,20 @@ from .feed import (
# open_trade_dialog, # open_trade_dialog,
# norm_trade_records, # norm_trade_records,
# ) # )
from .venues import (
OptionPair,
)
log = get_logger(__name__) log = get_logger(__name__)
__all__ = [ __all__ = [
'get_client', 'get_client',
# 'trades_dialogue', # 'trades_dialogue',
'get_mkt_info',
'open_history_client', 'open_history_client',
'open_symbol_search', 'open_symbol_search',
'stream_quotes', 'stream_quotes',
'OptionPair',
# 'norm_trade_records', # 'norm_trade_records',
] ]

View File

@ -19,10 +19,14 @@ Deribit backend.
''' '''
import asyncio import asyncio
from collections import ChainMap
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from datetime import datetime from datetime import datetime
from decimal import (
Decimal,
)
from functools import partial from functools import partial
import time import time
from typing import ( from typing import (
@ -31,7 +35,7 @@ from typing import (
Callable, Callable,
) )
import pendulum from pendulum import now
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
@ -51,7 +55,25 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from .venues import (
_ws_url,
MarketType,
PAIRTYPES,
Pair,
OptionPair,
JSONRPCResult,
JSONRPCChannel,
KLinesResult,
Trade,
LastTradesResult,
)
from piker.accounting import (
Asset,
digits_to_dec,
MktPair,
)
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
@ -74,57 +96,6 @@ _spawn_kwargs = {
} }
_url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int
usOut: int
usDiff: int
testnet: bool
class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str
params: dict
class KLinesResult(Struct):
close: list[float]
cost: list[float]
high: list[float]
low: list[float]
open: list[float]
status: str
ticks: list[int]
volume: list[float]
class Trade(Struct):
trade_seq: int
trade_id: str
timestamp: int
tick_direction: int
price: float
mark_price: float
iv: float
instrument_name: str
index_price: float
direction: str
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool
# convert datetime obj timestamp to unixtime in milliseconds # convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when): def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000)) return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -142,13 +113,15 @@ def str_to_cb_sym(name: str) -> Symbol:
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
return Symbol( return Symbol(
base, quote, base=base,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date, expiry_date=new_expiry_date)
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -165,77 +138,132 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
return Symbol( return Symbol(
base, quote, base=base,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date.upper()) expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol): def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
otype = 'C' if sym.option_type == CALL else 'P' otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
day, month, year = (
expiry_date[3:],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[:2]
)
return f'{day}{month}{year}'
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf, path = config.load() conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section = conf.get('deribit') section = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') log.warning(f'No config section found for deribit in {path}')
return {}
return conf conf_option = section.get('option', {})
section.clear # clear the dict to reuse it
section['deribit'] = {}
section['deribit']['key_id'] = conf_option.get('api_key')
section['deribit']['key_secret'] = conf_option.get('api_secret')
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
return section
class Client: class Client:
def __init__(self, json_rpc: Callable) -> None: def __init__(
self._pairs: dict[str, Any] = None self,
json_rpc: Callable
) -> None:
self._pairs: ChainMap[str, Pair] = ChainMap()
config = get_config().get('deribit', {}) config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config): self._key_id = config.get('key_id')
self._key_id = config['key_id'] self._key_secret = config.get('key_secret')
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self.json_rpc = json_rpc self.json_rpc = json_rpc
@property self._auth_ts = None
def currencies(self): self._auth_renew_ts = 5 # seconds to renew auth
return ['btc', 'eth', 'sol', 'usd']
async def get_balances(self, kind: str = 'option') -> dict[str, float]: async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
"""Background task that adquires a first access token and then will
refresh the access token.
https://docs.deribit.com/?python#authentication-2
"""
access_scope = 'trade:read_write'
current_ts = time.time()
if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts:
# if we are close to token expiry time
params = {
'grant_type': 'client_credentials',
'client_id': self._key_id,
'client_secret': self._key_secret,
'scope': access_scope
}
resp = await self.json_rpc('public/auth', params)
result = resp.result
self._auth_ts = time.time() + result['expires_in']
return await self.json_rpc(*args, **kwargs)
async def get_balances(
self,
kind: str = 'option'
) -> dict[str, float]:
"""Return the set of positions for this account """Return the set of positions for this account
by symbol. by symbol.
""" """
balances = {} balances = {}
for currency in self.currencies: for currency in self.currencies:
resp = await self.json_rpc( resp = await self._json_rpc_auth_wrapper(
'private/get_positions', params={ 'private/get_positions', params={
'currency': currency.upper(), 'currency': currency.upper(),
'kind': kind}) 'kind': kind})
@ -244,20 +272,46 @@ class Client:
return balances return balances
async def get_assets(self) -> dict[str, float]: async def get_assets(
self,
venue: str | None = None,
) -> dict[str, Asset]:
"""Return the set of asset balances for this account """Return the set of asset balances for this account
by symbol. by symbol.
""" """
balances = {} assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies = resp.result
for currency in currencies:
name = currency['currency']
tx_tick = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
assets[name] = Asset(
name=name,
atype=atype,
tx_tick=tx_tick)
for currency in self.currencies: instruments = await self.symbol_info(currency=name)
resp = await self.json_rpc( for instrument in instruments:
'private/get_account_summary', params={ pair = instruments[instrument]
'currency': currency.upper()}) assets[pair.symbol] = Asset(
name=pair.symbol,
atype=pair.venue,
tx_tick=pair.size_tick)
balances[currency] = resp.result['balance'] return assets
return balances async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {}
for key in self._pairs:
item = self._pairs.get(key)
flat[item.bs_fqme] = item
return flat
async def submit_limit( async def submit_limit(
self, self,
@ -274,7 +328,7 @@ class Client:
'type': 'limit', 'type': 'limit',
'price': price, 'price': price,
} }
resp = await self.json_rpc( resp = await self._json_rpc_auth_wrapper(
f'private/{action}', params) f'private/{action}', params)
return resp.result return resp.result
@ -282,10 +336,32 @@ class Client:
async def submit_cancel(self, oid: str): async def submit_cancel(self, oid: str):
"""Send cancel request for order id """Send cancel request for order id
""" """
resp = await self.json_rpc( resp = await self._json_rpc_auth_wrapper(
'private/cancel', {'order_id': oid}) 'private/cancel', {'order_id': oid})
return resp.result return resp.result
async def exch_info(
self,
sym: str | None = None,
venue: MarketType = 'option',
expiry: str | None = None,
) -> dict[str, Pair] | Pair:
pair_table: dict[str, Pair] = self._pairs
if (
sym
and (cached_pair := pair_table.get(sym))
):
return cached_pair
if sym:
return pair_table[sym]
else:
return self._pairs
async def symbol_info( async def symbol_info(
self, self,
instrument: Optional[str] = None, instrument: Optional[str] = None,
@ -293,7 +369,7 @@ class Client:
kind: str = 'option', kind: str = 'option',
expired: bool = False expired: bool = False
) -> dict[str, dict]: ) -> dict[str, Pair] | Pair:
''' '''
Get symbol infos. Get symbol infos.
@ -308,28 +384,65 @@ class Client:
'expired': str(expired).lower() 'expired': str(expired).lower()
} }
resp: JSONRPCResult = await self.json_rpc( resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_instruments', 'public/get_instruments',
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, dict] = {
item['instrument_name'].lower(): item instruments: dict[str, Pair] = {}
for item in results for item in results:
} symbol=item['instrument_name'].lower()
try:
pair: Pair = pair_type(
symbol=symbol,
**item
)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid deribit changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://docs.deribit.com/?python#deribit-api-v2-1-1'
)
raise
instruments[symbol] = pair
if instrument is not None: if instrument is not None:
return instruments[instrument] return instruments[instrument.lower()]
else: else:
return instruments return instruments
async def cache_symbols( async def cache_symbols(
self, self,
) -> dict: venue: MarketType = 'option',
if not self._pairs: ) -> None:
self._pairs = await self.symbol_info() # lookup internal mkt-specific pair table to update
pair_table: dict[str, Pair] = self._pairs
# make API request(s)
mkt_pairs = await self.symbol_info()
if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table
# `._pairs: ChainMap` for search B0
pairs_view_subtable[pair.bs_fqme] = pair
self._pairs.maps.append(pairs_view_subtable)
return self._pairs return self._pairs
@ -337,37 +450,35 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = 30, limit: int = 30,
) -> dict[str, Any]: ) -> dict[str, Pair]:
''' '''
Fuzzy search symbology set for pairs matching `pattern`. Fuzzy search symbology set for pairs matching `pattern`.
''' '''
pairs: dict[str, Any] = await self.symbol_info() pairs: dict[str, Pair] = await self.exch_info()
matches: dict[str, Pair] = match_from_pairs(
return match_from_pairs(
pairs=pairs, pairs=pairs,
query=pattern.upper(), query=pattern.upper(),
score_cutoff=35, score_cutoff=35,
limit=limit limit=limit
) )
# repack in name-keyed table
return {
pair['instrument_name'].lower(): pair
for pair in matches.values()
}
async def bars( async def bars(
self, self,
symbol: str, mkt: MktPair,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
) -> dict:
instrument = symbol ) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme.split('.')[0]
if end_dt is None: if end_dt is None:
end_dt = pendulum.now('UTC') end_dt = now('UTC')
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
@ -377,7 +488,7 @@ class Client:
end_time = deribit_timestamp(end_dt) end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data # https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self.json_rpc( resp = await self._json_rpc_auth_wrapper(
'public/get_tradingview_chart_data', 'public/get_tradingview_chart_data',
params={ params={
'instrument_name': instrument.upper(), 'instrument_name': instrument.upper(),
@ -387,36 +498,34 @@ class Client:
}) })
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars = [] new_bars: list[tuple] = []
for i in range(len(result.close)): for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [ row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
result.low[i], result.low[i],
result.close[i], result.close[i],
result.volume[i], result.volume[i]
0
] ]
new_bars.append((i,) + tuple(row)) new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines if not as_np:
return array return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades( async def last_trades(
self, self,
instrument: str, instrument: str,
count: int = 10 count: int = 10
): ):
resp = await self.json_rpc( resp = await self._json_rpc_auth_wrapper(
'public/get_last_trades_by_instrument', 'public/get_last_trades_by_instrument',
params={ params={
'instrument_name': instrument, 'instrument_name': instrument,
@ -428,78 +537,17 @@ class Client:
@acm @acm
async def get_client( async def get_client(
is_brokercheck: bool = False is_brokercheck: bool = False,
venue: MarketType = 'option',
) -> Client: ) -> Client:
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc _ws_url, response_type=JSONRPCResult
) as json_rpc
): ):
client = Client(json_rpc) client = Client(json_rpc)
_refresh_token: Optional[str] = None
_access_token: Optional[str] = None
async def _auth_loop(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
):
"""Background task that adquires a first access token and then will
refresh the access token while the nursery isn't cancelled.
https://docs.deribit.com/?python#authentication-2
"""
renew_time = 10
access_scope = 'trade:read_write'
_expiry_time = time.time()
got_access = False
nonlocal _refresh_token
nonlocal _access_token
while True:
if time.time() - _expiry_time < renew_time:
# if we are close to token expiry time
if _refresh_token != None:
# if we have a refresh token already dont need to send
# secret
params = {
'grant_type': 'refresh_token',
'refresh_token': _refresh_token,
'scope': access_scope
}
else:
# we don't have refresh token, send secret to initialize
params = {
'grant_type': 'client_credentials',
'client_id': client._key_id,
'client_secret': client._key_secret,
'scope': access_scope
}
resp = await json_rpc('public/auth', params)
result = resp.result
_expiry_time = time.time() + result['expires_in']
_refresh_token = result['refresh_token']
if 'access_token' in result:
_access_token = result['access_token']
if not got_access:
# first time this loop runs we must indicate task is
# started, we have auth
got_access = True
task_status.started()
else:
await trio.sleep(renew_time / 2)
# if we have client creds launch auth loop
if client._key_id is not None:
await n.start(_auth_loop)
await client.cache_symbols() await client.cache_symbols()
yield client yield client
n.cancel_scope.cancel() n.cancel_scope.cancel()
@ -523,7 +571,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay( async def aio_price_feed_relay(
fh: FeedHandler, fh: FeedHandler,
instrument: Symbol, instrument: str,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
@ -542,21 +590,33 @@ async def aio_price_feed_relay(
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'ticks': [ 'ticks': [
{'type': 'bid', {
'price': float(data.bid_price), 'size': float(data.bid_size)}, 'type': 'bid',
{'type': 'bsize', 'price': float(data.bid_price),
'price': float(data.bid_price), 'size': float(data.bid_size)}, 'size': float(data.bid_size)
{'type': 'ask', },
'price': float(data.ask_price), 'size': float(data.ask_size)}, {
{'type': 'asize', 'type': 'bsize',
'price': float(data.ask_price), 'size': float(data.ask_size)} 'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
] ]
})) }))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
channels=[TRADES, L1_BOOK], channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)], symbols=[sym],
callbacks={ callbacks={
TRADES: _trade, TRADES: _trade,
L1_BOOK: _l1 L1_BOOK: _l1
@ -597,9 +657,9 @@ async def maybe_open_price_feed(
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={
'instrument': instrument 'instrument': instrument.split('.')[0]
}, },
key=f'{instrument}-price', key=f'{instrument.split('.')[0]}-price',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)
@ -664,10 +724,10 @@ async def maybe_open_order_feed(
async with maybe_open_context( async with maybe_open_context(
acm_func=open_order_feed, acm_func=open_order_feed,
kwargs={ kwargs={
'instrument': instrument, 'instrument': instrument.split('.')[0],
'fh': fh 'fh': fh
}, },
key=f'{instrument}-order', key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)

View File

@ -21,18 +21,33 @@ Deribit backend.
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Callable from typing import Any, Optional, Callable
from pprint import pformat
import time import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import pendulum from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.brokers import open_cached_client from piker.accounting import (
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray from piker.data import ShmArray
from piker.data.validate import FeedInit
from piker.brokers._util import ( from piker.brokers._util import (
BrokerError, BrokerError,
DataUnavailable, DataUnavailable,
@ -47,9 +62,13 @@ from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client, Trade,
get_config, get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import (
Pair,
OptionPair,
)
_spawn_kwargs = { _spawn_kwargs = {
'infect_asyncio': True, 'infect_asyncio': True,
@ -64,36 +83,107 @@ async def open_history_client(
mkt: MktPair, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
async def get_ohlc( async def get_ohlc(
end_dt: Optional[datetime] = None, timeframe: float,
start_dt: Optional[datetime] = None, end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars( array: np.ndarray = await client.bars(
instrument, mkt,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
raise DataUnavailable raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time']) start_dt = from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time']) end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield get_ohlc, {'erlangs': 3, 'rate': 3}
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
)
return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
@ -108,31 +198,26 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging # XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel) get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0] sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
init_msgs = { mkt, pair = await get_mkt_info(sym)
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
)
nsym = piker_sym_to_cb_sym(sym) nsym = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
cache = await client.cache_symbols() cache = client._pairs
last_trades = (await client.last_trades( last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades cb_sym_to_deribit_inst(nsym), count=1)).trades
@ -174,12 +259,21 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
# load all symbols locally for fast search # load all symbols locally for fast search
cache = await client.cache_symbols() cache = client._pairs
await ctx.started() await ctx.started()
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream: async for pattern in stream:
# repack in dict form # NOTE: pattern fuzzy-matching is done within
await stream.send( # the methd impl.
await client.search_symbols(pattern)) pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)

View File

@ -0,0 +1,191 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
)
from decimal import Decimal
from msgspec import field
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
return iso_date
@property
def venue(self) -> str:
return 'option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -111,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True):
quoteMaxSize: float quoteMaxSize: float
quoteMinSize: float quoteMinSize: float
symbol: str # our bs_mktid, kucoin's internal id symbol: str # our bs_mktid, kucoin's internal id
feeCategory: int
makerFeeCoefficient: float
takerFeeCoefficient: float
st: bool
class AccountTrade(Struct, frozen=True): class AccountTrade(Struct, frozen=True):
@ -593,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
''' '''
async with ( async with (
httpx.AsyncClient( httpx.AsyncClient(
base_url=f'https://api.kucoin.com/api', base_url='https://api.kucoin.com/api',
) as trio_client, ) as trio_client,
): ):
client = Client(httpx_client=trio_client) client = Client(httpx_client=trio_client)
@ -637,7 +641,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info('Starting ping task for kucoin ws connection') log.warning('Starting ping task for kucoin ws connection')
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -649,9 +653,14 @@ async def open_ping_task(
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[MktPair, KucoinMktPair]: ) -> tuple[
MktPair,
KucoinMktPair,
]:
''' '''
Query for and return a `MktPair` and `KucoinMktPair`. Query for and return both a `piker.accounting.MktPair` and
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
''' '''
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
@ -726,6 +735,8 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}') log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols: for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
@ -733,7 +744,11 @@ async def stream_quotes(
ws: NoBsWs ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4()) log.info('API reported ping_interval: {ping_interval}\n')
connect_id: str = str(uuid4())
typ: str
quote: dict
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -747,20 +762,37 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
aclosing(stream_messages(ws, sym_str)) as msg_gen, aclosing(
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
): ):
typ, quote = await anext(msg_gen) typ, quote = await anext(iter_quotes)
while typ != 'trade':
# take care to not unblock here until we get a real # take care to not unblock here until we get a real
# trade quote # trade quote?
typ, quote = await anext(msg_gen) # ^TODO, remove this right?
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
async for typ, msg in msg_gen: # XXX NOTE, DO NOT include the `.<backend>` suffix!
await send_chan.send({sym_str: msg}) # OW the sampling loop will not broadcast correctly..
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm @acm
@ -815,7 +847,7 @@ async def subscribe(
) )
async def stream_messages( async def iter_normed_quotes(
ws: NoBsWs, ws: NoBsWs,
sym: str, sym: str,
@ -846,6 +878,9 @@ async def stream_messages(
yield 'trade', { yield 'trade', {
'symbol': sym, 'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price, 'last': trade_data.price,
'brokerd_ts': last_trade_ts, 'brokerd_ts': last_trade_ts,
'ticks': [ 'ticks': [
@ -938,7 +973,7 @@ async def open_history_client(
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
print( log.debug(
f'difference in time between load and processing' f'difference in time between load and processing'
f'{inow - times[-1]}' f'{inow - times[-1]}'
) )

View File

@ -653,7 +653,11 @@ class Router(Struct):
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
@ -716,7 +720,7 @@ class Router(Struct):
subs = self.subscribers[sub_key] subs = self.subscribers[sub_key]
sent_some: bool = False sent_some: bool = False
for client_stream in subs: for client_stream in subs.copy():
try: try:
await client_stream.send(msg) await client_stream.send(msg)
sent_some = True sent_some = True
@ -1010,6 +1014,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast( await router.client_broadcast(
status_msg.req.symbol, status_msg.req.symbol,
status_msg, status_msg,

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_arbiter( tractor.get_registry(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict, defaultdict,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
import trio import trio
from trio_typing import TaskStatus from trio import TaskStatus
from .ticktools import ( from .ticktools import (
frame_ticks, frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample a (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below `.service.maybe_open_samplerd()` and the below
``register_with_sampler()``. `register_with_sampler()`.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
@ -375,7 +377,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys())) await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -419,7 +424,6 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
@ -429,7 +433,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker.service import Services from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')
@ -437,26 +444,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want # singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree. # one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api? # TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']: mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks: async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
portal = await Services.actor_n.start_actor( # proxy-through to tractor
dname,
enable_modules=[ enable_modules=[
'piker.data._sampling', 'piker.data._sampling',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
if not already_started:
await Services.start_service_task( assert (
dname, ctx
portal, and
register_with_sampler, ctx.portal
period_s=1, and
sub_for_broadcasts=False, not ctx.cancel_called
) )
return True return True
@ -889,6 +903,7 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection. # to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to # I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors! # ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set() nobsws._connected.set()
await trio.sleep_forever() await trio.sleep_forever()
except HandshakeError: except HandshakeError:
log.exception(f'Retrying connection') log.exception('Retrying connection')
# ws & nursery block ends # ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
''' '''
JSONRPC response-request style machinery for transparent multiplexing of msgs JSONRPC response-request style machinery for transparent multiplexing
over a NoBsWs. of msgs over a NoBsWs.
''' '''
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
url: str, url: str,
start_id: int = 0, start_id: int = 0,
response_type: type = JSONRPCResult, response_type: type = JSONRPCResult,
request_type: Optional[type] = None, # request_type: Optional[type] = None,
request_hook: Optional[Callable] = None, # request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None, # error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]: ) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_autorecon_ws(url) as ws open_autorecon_ws(url) as ws
): ):
rpc_id: Iterable = count(start_id) rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {} rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict: async def json_rpc(method: str, params: dict) -> dict:
@ -394,27 +398,41 @@ async def open_jsonrpc_session(
perform a json rpc call and wait for the result, raise exception in perform a json rpc call and wait for the result, raise exception in
case of error field present on response case of error field present on response
''' '''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = { msg = {
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'id': next(rpc_id), 'id': req_id,
'method': method, 'method': method,
'params': params 'params': params
} }
_id = msg['id'] _id = msg['id']
rpc_results[_id] = { result = rpc_results[_id] = {
'result': None, 'result': None,
'event': trio.Event() 'error': None,
'event': trio.Event(), # signal caller resp arrived
} }
req_msgs[_id] = msg
await ws.send_msg(msg) await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait() await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result'] if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id] del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None: if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4)) raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
the server side. the server side.
''' '''
nonlocal req_msgs
async for msg in ws: async for msg in ws:
match msg: match msg:
case { case {
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
'params': _, 'params': _,
}: }:
log.debug(f'Recieved\n{msg}') log.debug(f'Recieved\n{msg}')
if request_hook: # if request_hook:
await request_hook(request_type(**msg)) # await request_hook(request_type(**msg))
case { case {
'error': error 'error': error
}: }:
log.warning(f'Recieved\n{error}') # if error_hook:
if error_hook: # await error_hook(response_type(**msg))
await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = msg['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _: case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere? => TODO: maybe to (re)move elsewhere?
''' '''
from ._mngr import Services as Services from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import ( from ._registry import (
_tractor_kwargs as _tractor_kwargs, _tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr, _default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import ( from typing import (
Optional,
Any, Any,
ClassVar, ClassVar,
) )
@ -30,13 +29,13 @@ from contextlib import (
) )
import tractor import tractor
import trio
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )
from ._mngr import ( from ._mngr import (
Services, open_service_mngr,
ServiceMngr,
) )
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: Optional[str] = None, loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode # XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
@ -69,7 +68,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that. # and spawn the service tree distributed per that.
start_method: str = 'trio', start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None, tractor_runtime_overrides: dict|None = None,
**tractor_kwargs, **tractor_kwargs,
) -> tuple[ ) -> tuple[
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs, **kwargs,
) -> Services: ) -> ServiceMngr:
''' '''
Start a root piker daemon with an indefinite lifetime. Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep A root actor-nursery is created which can be used to spawn and
alive underling services (see below). supervise underling service sub-actors (see below).
''' '''
# NOTE: for the root daemon we always enable the root # NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor, root_actor,
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
# assign globally for future daemon/task creation mngr: ServiceMngr
Services.actor_n = actor_nursery async with open_service_mngr(
Services.service_n = service_nursery debug_mode=debug_mode,
Services.debug_mode = debug_mode ) as mngr:
yield mngr
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
# @acm # @acm
# async def maybe_open_runtime( # async def maybe_open_runtime(
# loglevel: Optional[str] = None, # loglevel: str|None = None,
# **kwargs, # **kwargs,
# ) -> None: # ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[Services]: ) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import Services from ._mngr import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm @acm
async def start_ahab_service( async def start_ahab_service(
services: Services, services: ServiceMngr,
service_name: str, service_name: str,
# endpoint config passed as **kwargs # endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container') log.warning('Failed to cancel root permsed container')
except ( except (
trio.MultiError, # trio.MultiError,
ExceptionGroup,
) as err: ) as err:
for subexc in err.exceptions: for subexc in err.exceptions:
if isinstance(subexc, PermissionError): if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from collections import defaultdict
import tractor import tractor
import trio
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ._mngr import ( from ._mngr import (
Services, get_service_mngr,
ServiceMngr,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: str | None = None, loglevel: str | None = None,
singleton: bool = False, singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
''' '''
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = Services.locks[service_name] lock = _locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( async with find_service(
@ -132,7 +134,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
await portal.cancel_actor() # --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd( async def spawn_emsd(
@ -147,21 +207,22 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
portal = await Services.actor_n.start_actor( smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task( await smngr.start_service_task(
'emsd', 'emsd',
portal, portal,

View File

@ -18,16 +18,29 @@
daemon-service management API. daemon-service management API.
""" """
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import ( from typing import (
Callable, Callable,
Any, Any,
) )
import trio import msgspec
from trio_typing import TaskStatus
import tractor import tractor
import trio
from trio import TaskStatus
from tractor import ( from tractor import (
ActorNursery,
current_actor, current_actor,
ContextCancelled, ContextCancelled,
Context, Context,
@ -39,6 +52,130 @@ from ._util import (
) )
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln: # TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the # - factor this into a ``tractor.highlevel`` extension # pack for the
# library. # library.
@ -46,31 +183,46 @@ from ._util import (
# to the pikerd actor for starting services remotely! # to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns # - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion? # new actors and supervises them to completion?
class Services: @dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
actor_n: tractor._supervise.ActorNursery Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
Portal, Portal,
trio.Event, trio.Event,
] ]
] = {} ] = field(default_factory=dict)
locks = defaultdict(trio.Lock)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
portal: Portal, portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable, target: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
**ctx_kwargs, **ctx_kwargs,
) -> (trio.CancelScope, Context): ) -> (trio.CancelScope, Context, Any):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` teardown.
@ -83,6 +235,7 @@ class Services:
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
trio.Event, trio.Event,
Any, Any,
] ]
@ -90,64 +243,87 @@ class Services:
) -> Any: ) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
try:
async with portal.open_context( async with portal.open_context(
target, target,
allow_overruns=allow_overruns, allow_overruns=allow_overruns,
**ctx_kwargs, **ctx_kwargs,
) as (ctx, first): ) as (ctx, started):
# unblock once the remote context has started # unblock once the remote context has started
complete = trio.Event() complete = trio.Event()
task_status.started((cs, complete, first)) task_status.started((
cs,
ctx,
complete,
started,
))
log.info( log.info(
f'`pikerd` service {name} started with value {first}' f'`pikerd` service {name} started with value {started}'
) )
try:
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.result() ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context
# function or by being cancelled here by the # function or by being cancelled here by the
# surrounding cancel scope. # surrounding cancel scope.
return (await portal.result(), ctx_res) return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe: except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid our_uid: tuple[str, str] = current_actor().uid
if ( if (
canceller != portal.channel.uid canceller != portal.chan.uid
and and
canceller != our_uid canceller != our_uid
): ):
log.cancel( log.cancel(
f'Actor-service {name} was remotely cancelled?\n' f'Actor-service `{name}` was remotely cancelled by a peer?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' # TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
) )
else: else:
raise raise
finally: finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor() await portal.cancel_actor()
complete.set() complete.set()
self.service_tasks.pop(name) self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task) cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
self.service_tasks[name] = (cs, portal, complete) self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service( async def cancel_service(
self, self,
name: str, name: str,
@ -158,8 +334,80 @@ class Services:
''' '''
log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name] cs, sub_ctx, portal, complete = self.service_tasks[name]
cs.cancel()
# cs.cancel()
await sub_ctx.cancel()
await complete.wait() await complete.wait()
assert name not in self.service_tasks, \
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?' f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
# TODO: oof, needs to be changed to `httpx`!
import asks import asks
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc # import purerpc
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
from . import Services from . import ServiceMngr
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: Services, service_mngr: ServiceMngr,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -458,13 +458,15 @@ async def start_backfill(
'bf_until <- last_start_dt:\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
) )
# UGH: what's a better way?
# ugh, what's a better way? # TODO: backends are responsible for being correct on
# TODO: fwiw, we probably want a way to signal a throttle # this right!?
# condition (eg. with ib) so that we can halt the # -[ ] in the `ib` case we could maybe offer some way
# request loop until the condition is resolved? # to halt the request loop until the condition is
if timeframe > 1: # resolved or should the backend be entirely in
await tractor.pause() # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
assert ( assert (
@ -578,6 +580,7 @@ async def start_backfill(
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including # for now, our table key schema is not including
# the dst[/src] source asset token. # the dst[/src] source asset token.

2326
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -50,10 +50,8 @@ attrs = "^23.1.0"
bidict = "^0.22.1" bidict = "^0.22.1"
colorama = "^0.4.6" colorama = "^0.4.6"
colorlog = "^6.7.0" colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86" ib-insync = "^0.9.86"
msgspec = "^0.18.0" msgspec = "^0.18.6"
numba = "^0.59.0" numba = "^0.59.0"
numpy = "^1.25" numpy = "^1.25"
polars = "^0.18.13" polars = "^0.18.13"
@ -71,11 +69,13 @@ pdbp = "^1.5.0"
trio = "^0.24" trio = "^0.24"
pendulum = "^3.0.0" pendulum = "^3.0.0"
httpx = "^0.27.0" httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor] [tool.poetry.dependencies.tractor]
develop = true develop = true
git = 'https://github.com/goodboy/tractor.git' git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'asyncio_debugger_support' branch = 'aio_abandons'
# path = "../tractor" # path = "../tractor"
[tool.poetry.dependencies.asyncvnc] [tool.poetry.dependencies.asyncvnc]
@ -109,6 +109,8 @@ pytest = "^6.0.0"
elasticsearch = "^8.9.0" elasticsearch = "^8.9.0"
xonsh = "^0.14.2" xonsh = "^0.14.2"
prompt-toolkit = "3.0.40" prompt-toolkit = "3.0.40"
cython = "^3.0.0"
greenback = "^1.1.1"
# console ehancements and eventually remote debugging # console ehancements and eventually remote debugging
# extras/helpers. # extras/helpers.

View File

@ -10,7 +10,7 @@ from piker import (
config, config,
) )
from piker.service import ( from piker.service import (
Services, get_service_mngr,
) )
from piker.log import get_console_log from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
assert service_manager is Services assert service_manager is get_service_mngr()
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
'pikerd', 'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor import tractor
from uuid import uuid4 from uuid import uuid4
from piker.service import Services from piker.service import ServiceMngr
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker( def test_ems_err_on_bad_broker(
open_test_pikerd: Services, open_test_pikerd: ServiceMngr,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme(): async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import ( from piker.service import (
find_service, find_service,
Services, ServiceMngr,
) )
from piker.data import ( from piker.data import (
open_feed, open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main(): async def main():
port = 6666 port = 6666
daemon_addr = ('127.0.0.1', port) daemon_addr = ('127.0.0.1', port)
services: Services services: ServiceMngr
async with ( async with (
open_test_pikerd( open_test_pikerd(