Compare commits

...

46 Commits

Author SHA1 Message Date
Nelson Torres 6555ccfbba config refactor
only one get_config method for api class and cryptofeed feed handler
2024-11-15 15:24:08 -03:00
Nelson Torres 75d1d007fb move constants to venue 2024-11-15 14:41:47 -03:00
Nelson Torres 2bdbe0f20e refactor redundant code 2024-11-15 14:26:16 -03:00
Nelson Torres a117177759 name formatting fixes 2024-11-15 11:23:05 -03:00
Nelson Torres 30060a83c9 get_mkt_info cleanup 2024-11-15 11:22:27 -03:00
Nelson Torres 156a35b606 cache_symbols refactor 2024-11-15 11:20:48 -03:00
Nelson Torres 89e241c132 json_rpc_auth_wrapper 2024-11-15 11:18:55 -03:00
Nelson Torres df8d1274ae move object classes to venue 2024-11-15 11:15:25 -03:00
Nelson Torres 0916b707e2 Added options symbols to get_assets 2024-11-14 17:39:52 -03:00
Tyler Goodlet 45788b0b53 .clearing._ems: Don't require `first_quote['last']`
Instead just check for the field (which i'm not huge on the key-name for
anyway) and if not found get the "last price" from the real-time shm
buffer's latest 'close' sample.

Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop
such that if a `client_stream` is popped on connection failure, we don't
RTE for the "size changed on iteration".
2024-11-13 13:50:43 +00:00
Tyler Goodlet 38a1f0b9ee `kucoin`: repair live quotes streaming..
This must have broke at some point during the new `MktPair` and thus
`.fqme: str` updates; mas-o-menos the symbol key in the quote-msg-`dict`
was NOT set to the `MktPair.bs_fqme: str` value and thus wasn't being
processed by the downstream sampling and feed subsys.

So fix that as well as a few other refinements,
- set the `topic: mkt.bs_fqme` in quote msgs obvi.
- drop the "wait for first clearing vlm" quote poll loop; going to fix
  the sampler to handle a `first_quote` without a `'last'` key.
- add some typing around calls to `get_mkt_info()`.
- rename `stream_messages()` -> `iter_normed_quotes()`.
2024-11-13 13:50:16 +00:00
Nelson Torres f291654dbe Deleted settlePlan field from binance FutesPair. 2024-11-13 13:49:49 +00:00
Nelson Torres e9fa422916 Added missing fields for kucoin.
feeCategory, makerFeeCoefficient, takerFeeCoefficient and st.
2024-11-13 13:49:32 +00:00
Nelson Torres 5304a36b87 get_assets now uses public endpoint
It's better if the data is available through a public endpoint.
2024-11-13 10:43:30 -03:00
Nelson Torres 089c79e905 now using exch_info in search_symbols 2024-11-13 10:40:05 -03:00
Nelson Torres d848050b52 Fix bs_fqme using venue and expiry 2024-11-12 16:07:46 -03:00
Nelson Torres ddffe2bec6 Added expiry property for OptionPair 2024-11-12 16:06:59 -03:00
Nelson Torres 19b4ca9d85 No longer needed 2024-11-12 16:05:54 -03:00
Nelson Torres f037f851d8 bs_mktid instead bs_fqme for deribits options 2024-11-12 12:22:28 -03:00
Nelson Torres a3ab8dd8fe Fixed pair instrument name in search_symbols endpoint.
Fixed instrument in bars endpoint, for options in deribits bs_mktid instead bs_fqme.
Fixed the id is in msg.
2024-11-12 12:16:07 -03:00
Tyler Goodlet 6fa0d4bcf3 data._web_bs: try to raise jsonrpc errors in parent task 2024-11-11 13:07:39 +00:00
Nelson Torres a4f7fa9c1a Add necessary classes in init file for deribit 2024-11-08 21:58:45 +00:00
Nelson Torres 266ecf6206 Minor refactor in open_symbol_search 2024-11-08 21:58:06 +00:00
Nelson Torres ea6126d310 stream_quotes now using FeedInit 2024-11-08 21:57:35 +00:00
Nelson Torres 1f4a5b80c4 symbol_info refactor 2024-11-08 21:53:10 +00:00
Nelson Torres ac6f52088a search_symbols output type fix 2024-11-08 21:48:28 +00:00
Nelson Torres 960298514c add get_mkt_pairs method 2024-11-08 21:47:24 +00:00
Nelson Torres 71f3a0a4cd get_assets refactor 2024-11-08 21:46:58 +00:00
Nelson Torres b25a7699ab formatting 2024-11-08 21:45:27 +00:00
Nelson Torres b39affc96e created exch_info in api class 2024-11-08 21:42:42 +00:00
Nelson Torres be8629929b modify self_pairs type to ChainMap 2024-11-08 21:41:52 +00:00
Nelson Torres 4776be6736 Necessary imports 2024-11-08 21:38:08 +00:00
Nelson Torres 008e68174b add get_market_info 2024-11-08 21:36:23 +00:00
Nelson Torres b4a9b86783 Necessary imports 2024-11-08 21:35:09 +00:00
Nelson Torres d3ca571c0e minor fixes in venues 2024-11-08 21:34:03 +00:00
Nelson Torres b3bbef30c0 add class Pair in venues, PAIRTYPES for future uses 2024-11-06 14:45:11 +00:00
Nelson Torres 499b2d0090 fix syms for venues.
little refactor in get_config, and created get_fh_config for cryptofeed.
2024-11-04 19:36:16 +00:00
Nelson Torres 8b0f1e7045 venues for deribit 2024-11-04 09:34:21 -03:00
Nelson Torres b2cfa3444f Added cryptofeed and pyarrow necessary for the feed, enable deribit
in the brokers init file, at this point the feed is working, to check
the tables use vd tool.
2024-08-28 23:58:48 +00:00
Nelson Torres 0be454c3d6 Updated tractor method name. 2024-08-23 18:06:05 +00:00
Tyler Goodlet de6189da4d Drop `.cancel_actor()` from `maybe_spawn_daemon()`
Since `tractor`'s new and improved inter-actor cancellation semantics
are much more pedantic, AND bc we use the `ServiceMngr` for spawning
service actors on-demand, the caller of `maybe_spawn_daemon()` should
NEVER conduct a so called "out of band" `Actor`-runtime cancel request
since this is precisely the job of our `ServiceMngr` XD

Add a super in depth note explaining the underlying issue and adding
a todo list of how we should prolly augment `tractor` to make such cases
easier to grok and fix in the future!
2024-08-21 17:37:40 +00:00
Tyler Goodlet cc5b21a7e6 Prep service mngr for move to `tractor.hilevel`
Given it's a fairly simple yet useful abstraction, it makes sense to
offer this sub-sys alongside the core `tractor` runtime lib.

Without going into extreme detail on the impl changes (it'll come in
the commit that moves to the other repo) here is the high level summary:
------ - ------
- rename `Services` -> `ServiceMngr` and use an factory `@acm`
  to guarantee a single-instance-per-actor using a niche approach for a
  singleton object using a default keyword-arg B)
  - the mod level `open_service_mngr()` and `get_service_mngr()` are the
    new allocation/access API.
- add a `ServiceMngr.start_service()` method which does the work of both
  spawning a new subactor (for the daemon) and uses its portal to start
  the mngr side supervision task.
- open actor/task nurseries inside the `@acm` allocator.

Adjust other dependent subsystems to match:
------ - ------
- use `open_service_mngr()` when first allocated in `open_pikerd()`.
- use `get_service_mngr()` instead of importing the class ref inside
  `.service.maybe_spawn_daemon()`, `.brokers._daemon.spawn_brokerd()`
  and `.data._sampling.spawn_samplerd()` using a `partial` to pack in
  the endpoint ctx kwargs (unpacked inside `.start_service()` XD).
2024-08-20 23:01:51 -03:00
Nelson Torres 35a9d8ec9d tractor branch updated, msgspec version upgraded, cython and greenback dependencies moved under dev group 2024-08-14 23:22:30 +00:00
Nelson Torres a831212c86 default.nix file for qt6 uis 2024-08-14 20:17:14 -03:00
Tyler Goodlet e987d7d7c4 Lel, forgot to add a `SPOT` venue for `binance`.. 2024-08-14 22:51:23 +00:00
Tyler Goodlet 5ec756234a Mask no-data pause-point, add perps to sig.
Was orig for debugging an issue with `kucoin` i think but definitely
shouldn't be left in XD

Also add `'perpetual_future'` to `start_backfill()` input literal set.
2024-08-14 19:35:24 -03:00
27 changed files with 3204 additions and 864 deletions

82
default.nix 100644
View File

@ -0,0 +1,82 @@
with (import <nixpkgs> {});
with python312Packages;
let
glibStorePath = lib.getLib glib;
qtpyStorePath = lib.getLib qtpy;
pyqt6StorePath = lib.getLib pyqt6;
pyqt6SipStorePath = lib.getLib pyqt6-sip;
qt6baseStorePath = lib.getLib qt6.qtbase;
rapidfuzzStorePath = lib.getLib rapidfuzz;
qdarkstyleStorePath = lib.getLib qdarkstyle;
in
stdenv.mkDerivation {
name = "piker-qt6-poetry-shell";
buildInputs = [
# System requirements.
glib
qt6.qtbase
libgcc.lib
# Python requirements.
python312Full
poetry-core
qdarkstyle
rapidfuzz
pyqt6
qtpy
];
src = null;
shellHook = ''
set -e
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}"
echo "qtbase path: $QTBASE_PATH"
echo ""
export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins"
export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
echo "qt plugin path: $QT_PLUGIN_PATH"
echo ""
# Maybe create venv & install deps
poetry install --with uis
# Use pyqt6 from System, patch activate script
ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate"
export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
echo "rapidfuzz at: $RPDFUZZ_PATH"
echo "qdarkstyle at: $QDRKSTYLE_PATH"
echo "qtpy at: $QTPY_PATH"
echo "pyqt6 at: $PYQT6_PATH"
echo "pyqt6-sip at: $PYQT6_SIP_PATH"
echo ""
PATCH="export PYTHONPATH=\""
PATCH="$PATCH\$RPDFUZZ_PATH"
PATCH="$PATCH:\$QDRKSTYLE_PATH"
PATCH="$PATCH:\$QTPY_PATH"
PATCH="$PATCH:\$PYQT6_PATH"
PATCH="$PATCH:\$PYQT6_SIP_PATH"
PATCH="$PATCH\""
if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then
echo "venv is already patched."
else
echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..."
sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH
fi
poetry shell
'';
}

View File

@ -51,6 +51,7 @@ __brokers__: list[str] = [
'ib',
'kraken',
'kucoin',
'deribit',
# broken but used to work
# 'questrade',
@ -61,7 +62,6 @@ __brokers__: list[str] = [
# wstrade
# iex
# deribit
# bitso
]

View File

@ -23,6 +23,7 @@ from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from functools import partial
from types import ModuleType
from typing import (
TYPE_CHECKING,
@ -190,14 +191,17 @@ def broker_init(
async def spawn_brokerd(
brokername: str,
loglevel: str | None = None,
**tractor_kwargs,
) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon')
@ -217,27 +221,35 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery
from piker.service import Services
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=Services.debug_mode,
**tractor_kwargs
from piker.service import (
get_service_mngr,
ServiceMngr,
)
# NOTE: the service mngr expects an already spawned actor + its
# portal ref in order to do non-blocking setup of brokerd
# service nursery.
await Services.start_service_task(
dname,
portal,
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
mngr: ServiceMngr = get_service_mngr()
ctx: tractor.Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs)
brokername=brokername,
loglevel=loglevel,
),
debug_mode=mngr.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs
)
assert (
not ctx.cancel_called
and ctx.portal # parent side
and dname in ctx.chan.uid # subactor is named as desired
)
return True
@ -262,8 +274,7 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
f'brokerd.{brokername}',
service_name=f'brokerd.{brokername}',
service_task_target=spawn_brokerd,
spawn_args={
'brokername': brokername,

View File

@ -567,6 +567,7 @@ class Client:
) -> str:
return {
'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..?
}[pair.venue]

View File

@ -181,7 +181,6 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT',
quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'],

View File

@ -25,6 +25,7 @@ from .api import (
get_client,
)
from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
@ -34,15 +35,20 @@ from .feed import (
# open_trade_dialog,
# norm_trade_records,
# )
from .venues import (
OptionPair,
)
log = get_logger(__name__)
__all__ = [
'get_client',
# 'trades_dialogue',
'get_mkt_info',
'open_history_client',
'open_symbol_search',
'stream_quotes',
'OptionPair',
# 'norm_trade_records',
]

View File

@ -19,10 +19,14 @@ Deribit backend.
'''
import asyncio
from collections import ChainMap
from contextlib import (
asynccontextmanager as acm,
)
from datetime import datetime
from decimal import (
Decimal,
)
from functools import partial
import time
from typing import (
@ -31,7 +35,7 @@ from typing import (
Callable,
)
import pendulum
from pendulum import now
import trio
from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy
@ -51,7 +55,25 @@ from cryptofeed.defines import (
OPTION, CALL, PUT
)
from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from .venues import (
_ws_url,
MarketType,
PAIRTYPES,
Pair,
OptionPair,
JSONRPCResult,
JSONRPCChannel,
KLinesResult,
Trade,
LastTradesResult,
)
from piker.accounting import (
Asset,
digits_to_dec,
MktPair,
)
from piker.data import (
def_iohlcv_fields,
match_from_pairs,
@ -74,57 +96,6 @@ _spawn_kwargs = {
}
_url = 'https://www.deribit.com'
_ws_url = 'wss://www.deribit.com/ws/api/v2'
_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int
usOut: int
usDiff: int
testnet: bool
class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str
params: dict
class KLinesResult(Struct):
close: list[float]
cost: list[float]
high: list[float]
low: list[float]
open: list[float]
status: str
ticks: list[int]
volume: list[float]
class Trade(Struct):
trade_seq: int
trade_id: str
timestamp: int
tick_direction: int
price: float
mark_price: float
iv: float
instrument_name: str
index_price: float
direction: str
combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '',
amount: float
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool
# convert datetime obj timestamp to unixtime in milliseconds
def deribit_timestamp(when):
return int((when.timestamp() * 1000) + (when.microsecond / 1000))
@ -142,13 +113,15 @@ def str_to_cb_sym(name: str) -> Symbol:
else:
raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date,
expiry_normalize=False)
expiry_date=new_expiry_date)
def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -165,77 +138,132 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
raise Exception("Couldn\'t parse option type")
return Symbol(
base, quote,
base=base,
quote=quote,
type=OPTION,
strike_price=strike_price,
option_type=option_type,
expiry_date=expiry_date.upper())
expiry_date=expiry_date)
def cb_sym_to_deribit_inst(sym: Symbol):
# cryptofeed normalized
cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
# deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
# YYMDD
# 01234
year, month, day = (
exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date)
otype = 'C' if sym.option_type == CALL else 'P'
return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}'
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD
# 01234
day, month, year = (
expiry_date[3:],
months[cb_norm.index(expiry_date[2:3])],
expiry_date[:2]
)
return f'{day}{month}{year}'
def get_config() -> dict[str, Any]:
conf, path = config.load()
conf: dict
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section = conf.get('deribit')
# TODO: document why we send this, basically because logging params for cryptofeed
conf['log'] = {}
conf['log']['disabled'] = True
if section is None:
log.warning(f'No config section found for deribit in {path}')
return {}
return conf
conf_option = section.get('option', {})
section.clear # clear the dict to reuse it
section['deribit'] = {}
section['deribit']['key_id'] = conf_option.get('api_key')
section['deribit']['key_secret'] = conf_option.get('api_secret')
section['log'] = {}
section['log']['filename'] = 'feedhandler.log'
section['log']['level'] = 'DEBUG'
return section
class Client:
def __init__(self, json_rpc: Callable) -> None:
self._pairs: dict[str, Any] = None
def __init__(
self,
json_rpc: Callable
) -> None:
self._pairs: ChainMap[str, Pair] = ChainMap()
config = get_config().get('deribit', {})
if ('key_id' in config) and ('key_secret' in config):
self._key_id = config['key_id']
self._key_secret = config['key_secret']
else:
self._key_id = None
self._key_secret = None
self._key_id = config.get('key_id')
self._key_secret = config.get('key_secret')
self.json_rpc = json_rpc
@property
def currencies(self):
return ['btc', 'eth', 'sol', 'usd']
self._auth_ts = None
self._auth_renew_ts = 5 # seconds to renew auth
async def get_balances(self, kind: str = 'option') -> dict[str, float]:
async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult:
"""Background task that adquires a first access token and then will
refresh the access token.
https://docs.deribit.com/?python#authentication-2
"""
access_scope = 'trade:read_write'
current_ts = time.time()
if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts:
# if we are close to token expiry time
params = {
'grant_type': 'client_credentials',
'client_id': self._key_id,
'client_secret': self._key_secret,
'scope': access_scope
}
resp = await self.json_rpc('public/auth', params)
result = resp.result
self._auth_ts = time.time() + result['expires_in']
return await self.json_rpc(*args, **kwargs)
async def get_balances(
self,
kind: str = 'option'
) -> dict[str, float]:
"""Return the set of positions for this account
by symbol.
"""
balances = {}
for currency in self.currencies:
resp = await self.json_rpc(
resp = await self._json_rpc_auth_wrapper(
'private/get_positions', params={
'currency': currency.upper(),
'kind': kind})
@ -244,20 +272,46 @@ class Client:
return balances
async def get_assets(self) -> dict[str, float]:
async def get_assets(
self,
venue: str | None = None,
) -> dict[str, Asset]:
"""Return the set of asset balances for this account
by symbol.
"""
balances = {}
assets = {}
resp = await self._json_rpc_auth_wrapper(
'public/get_currencies',
params={}
)
currencies = resp.result
for currency in currencies:
name = currency['currency']
tx_tick = digits_to_dec(currency['fee_precision'])
atype='crypto_currency'
assets[name] = Asset(
name=name,
atype=atype,
tx_tick=tx_tick)
for currency in self.currencies:
resp = await self.json_rpc(
'private/get_account_summary', params={
'currency': currency.upper()})
instruments = await self.symbol_info(currency=name)
for instrument in instruments:
pair = instruments[instrument]
assets[pair.symbol] = Asset(
name=pair.symbol,
atype=pair.venue,
tx_tick=pair.size_tick)
balances[currency] = resp.result['balance']
return assets
return balances
async def get_mkt_pairs(self) -> dict[str, Pair]:
flat: dict[str, Pair] = {}
for key in self._pairs:
item = self._pairs.get(key)
flat[item.bs_fqme] = item
return flat
async def submit_limit(
self,
@ -274,7 +328,7 @@ class Client:
'type': 'limit',
'price': price,
}
resp = await self.json_rpc(
resp = await self._json_rpc_auth_wrapper(
f'private/{action}', params)
return resp.result
@ -282,10 +336,32 @@ class Client:
async def submit_cancel(self, oid: str):
"""Send cancel request for order id
"""
resp = await self.json_rpc(
resp = await self._json_rpc_auth_wrapper(
'private/cancel', {'order_id': oid})
return resp.result
async def exch_info(
self,
sym: str | None = None,
venue: MarketType = 'option',
expiry: str | None = None,
) -> dict[str, Pair] | Pair:
pair_table: dict[str, Pair] = self._pairs
if (
sym
and (cached_pair := pair_table.get(sym))
):
return cached_pair
if sym:
return pair_table[sym]
else:
return self._pairs
async def symbol_info(
self,
instrument: Optional[str] = None,
@ -293,7 +369,7 @@ class Client:
kind: str = 'option',
expired: bool = False
) -> dict[str, dict]:
) -> dict[str, Pair] | Pair:
'''
Get symbol infos.
@ -308,28 +384,65 @@ class Client:
'expired': str(expired).lower()
}
resp: JSONRPCResult = await self.json_rpc(
resp: JSONRPCResult = await self._json_rpc_auth_wrapper(
'public/get_instruments',
params,
)
# convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind]
results: list[dict] | None = resp.result
instruments: dict[str, dict] = {
item['instrument_name'].lower(): item
for item in results
}
instruments: dict[str, Pair] = {}
for item in results:
symbol=item['instrument_name'].lower()
try:
pair: Pair = pair_type(
symbol=symbol,
**item
)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid deribit changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://docs.deribit.com/?python#deribit-api-v2-1-1'
)
raise
instruments[symbol] = pair
if instrument is not None:
return instruments[instrument]
return instruments[instrument.lower()]
else:
return instruments
async def cache_symbols(
self,
) -> dict:
venue: MarketType = 'option',
if not self._pairs:
self._pairs = await self.symbol_info()
) -> None:
# lookup internal mkt-specific pair table to update
pair_table: dict[str, Pair] = self._pairs
# make API request(s)
mkt_pairs = await self.symbol_info()
if not mkt_pairs:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
pairs_view_subtable: dict[str, Pair] = {}
for instrument in mkt_pairs:
pair_type: Type = PAIRTYPES[venue]
pair: Pair = pair_type(**mkt_pairs[instrument].to_dict())
pair_table[pair.symbol.upper()] = pair
# update an additional top-level-cross-venue-table
# `._pairs: ChainMap` for search B0
pairs_view_subtable[pair.bs_fqme] = pair
self._pairs.maps.append(pairs_view_subtable)
return self._pairs
@ -337,37 +450,35 @@ class Client:
self,
pattern: str,
limit: int = 30,
) -> dict[str, Any]:
) -> dict[str, Pair]:
'''
Fuzzy search symbology set for pairs matching `pattern`.
'''
pairs: dict[str, Any] = await self.symbol_info()
matches: dict[str, Pair] = match_from_pairs(
pairs: dict[str, Pair] = await self.exch_info()
return match_from_pairs(
pairs=pairs,
query=pattern.upper(),
score_cutoff=35,
limit=limit
)
# repack in name-keyed table
return {
pair['instrument_name'].lower(): pair
for pair in matches.values()
}
async def bars(
self,
symbol: str,
mkt: MktPair,
start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None,
limit: int = 1000,
as_np: bool = True,
) -> dict:
instrument = symbol
) -> list[tuple] | np.ndarray:
instrument: str = mkt.bs_fqme.split('.')[0]
if end_dt is None:
end_dt = pendulum.now('UTC')
end_dt = now('UTC')
if start_dt is None:
start_dt = end_dt.start_of(
@ -377,7 +488,7 @@ class Client:
end_time = deribit_timestamp(end_dt)
# https://docs.deribit.com/#public-get_tradingview_chart_data
resp = await self.json_rpc(
resp = await self._json_rpc_auth_wrapper(
'public/get_tradingview_chart_data',
params={
'instrument_name': instrument.upper(),
@ -387,36 +498,34 @@ class Client:
})
result = KLinesResult(**resp.result)
new_bars = []
new_bars: list[tuple] = []
for i in range(len(result.close)):
_open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i],
result.high[i],
result.low[i],
result.close[i],
result.volume[i],
0
result.volume[i]
]
new_bars.append((i,) + tuple(row))
array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines
return array
if not as_np:
return result
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades(
self,
instrument: str,
count: int = 10
):
resp = await self.json_rpc(
resp = await self._json_rpc_auth_wrapper(
'public/get_last_trades_by_instrument',
params={
'instrument_name': instrument,
@ -428,78 +537,17 @@ class Client:
@acm
async def get_client(
is_brokercheck: bool = False
is_brokercheck: bool = False,
venue: MarketType = 'option',
) -> Client:
async with (
trio.open_nursery() as n,
open_jsonrpc_session(
_testnet_ws_url, dtype=JSONRPCResult) as json_rpc
_ws_url, response_type=JSONRPCResult
) as json_rpc
):
client = Client(json_rpc)
_refresh_token: Optional[str] = None
_access_token: Optional[str] = None
async def _auth_loop(
task_status: TaskStatus = trio.TASK_STATUS_IGNORED
):
"""Background task that adquires a first access token and then will
refresh the access token while the nursery isn't cancelled.
https://docs.deribit.com/?python#authentication-2
"""
renew_time = 10
access_scope = 'trade:read_write'
_expiry_time = time.time()
got_access = False
nonlocal _refresh_token
nonlocal _access_token
while True:
if time.time() - _expiry_time < renew_time:
# if we are close to token expiry time
if _refresh_token != None:
# if we have a refresh token already dont need to send
# secret
params = {
'grant_type': 'refresh_token',
'refresh_token': _refresh_token,
'scope': access_scope
}
else:
# we don't have refresh token, send secret to initialize
params = {
'grant_type': 'client_credentials',
'client_id': client._key_id,
'client_secret': client._key_secret,
'scope': access_scope
}
resp = await json_rpc('public/auth', params)
result = resp.result
_expiry_time = time.time() + result['expires_in']
_refresh_token = result['refresh_token']
if 'access_token' in result:
_access_token = result['access_token']
if not got_access:
# first time this loop runs we must indicate task is
# started, we have auth
got_access = True
task_status.started()
else:
await trio.sleep(renew_time / 2)
# if we have client creds launch auth loop
if client._key_id is not None:
await n.start(_auth_loop)
await client.cache_symbols()
yield client
n.cancel_scope.cancel()
@ -523,7 +571,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay(
fh: FeedHandler,
instrument: Symbol,
instrument: str,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
@ -542,21 +590,33 @@ async def aio_price_feed_relay(
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
{'type': 'bid',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'bsize',
'price': float(data.bid_price), 'size': float(data.bid_size)},
{'type': 'ask',
'price': float(data.ask_price), 'size': float(data.ask_size)},
{'type': 'asize',
'price': float(data.ask_price), 'size': float(data.ask_size)}
{
'type': 'bid',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'bsize',
'price': float(data.bid_price),
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
]
}))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed(
DERIBIT,
channels=[TRADES, L1_BOOK],
symbols=[piker_sym_to_cb_sym(instrument)],
symbols=[sym],
callbacks={
TRADES: _trade,
L1_BOOK: _l1
@ -597,9 +657,9 @@ async def maybe_open_price_feed(
async with maybe_open_context(
acm_func=open_price_feed,
kwargs={
'instrument': instrument
'instrument': instrument.split('.')[0]
},
key=f'{instrument}-price',
key=f'{instrument.split('.')[0]}-price',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)
@ -664,10 +724,10 @@ async def maybe_open_order_feed(
async with maybe_open_context(
acm_func=open_order_feed,
kwargs={
'instrument': instrument,
'instrument': instrument.split('.')[0],
'fh': fh
},
key=f'{instrument}-order',
key=f'{instrument.split('.')[0]}-order',
) as (cache_hit, feed):
if cache_hit:
yield broadcast_receiver(feed, 10)

View File

@ -21,18 +21,33 @@ Deribit backend.
from contextlib import asynccontextmanager as acm
from datetime import datetime
from typing import Any, Optional, Callable
from pprint import pformat
import time
import trio
from trio_typing import TaskStatus
import pendulum
from pendulum import (
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy
import numpy as np
import tractor
from piker.brokers import open_cached_client
from piker.accounting import (
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from piker.data.validate import FeedInit
from piker.brokers._util import (
BrokerError,
DataUnavailable,
@ -47,9 +62,13 @@ from cryptofeed.symbols import Symbol
from .api import (
Client, Trade,
get_config,
str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed
)
from .venues import (
Pair,
OptionPair,
)
_spawn_kwargs = {
'infect_asyncio': True,
@ -64,36 +83,107 @@ async def open_history_client(
mkt: MktPair,
) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client:
async def get_ohlc(
end_dt: Optional[datetime] = None,
start_dt: Optional[datetime] = None,
timeframe: float,
end_dt: datetime | None = None,
start_dt: datetime | None = None,
) -> tuple[
np.ndarray,
datetime, # start
datetime, # end
]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array = await client.bars(
instrument,
array: np.ndarray = await client.bars(
mkt,
start_dt=start_dt,
end_dt=end_dt,
)
if len(array) == 0:
raise DataUnavailable
raise NoData(
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = pendulum.from_timestamp(array[-1]['time'])
start_dt = from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3}
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
pair: Pair = await client.exch_info(
sym=pair_str,
)
mkt_mode = pair.venue
client.mkt_mode = mkt_mode
dst: Asset | None = assets.get(pair.bs_dst_asset)
src: Asset | None = assets.get(pair.bs_src_asset)
mkt = MktPair(
dst=dst,
src=src,
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=pair.expiry,
venue=mkt_mode,
broker='deribit',
_atype=mkt_mode,
_fqme_without_src=True,
)
return mkt, pair
async def stream_quotes(
send_chan: trio.abc.SendChannel,
@ -108,31 +198,26 @@ async def stream_quotes(
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
sym = symbols[0]
sym = symbols[0].split('.')[0]
init_msgs: list[FeedInit] = []
async with (
open_cached_client('deribit') as client,
send_chan as send_chan
):
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
mkt, pair = await get_mkt_info(sym)
# build out init msgs according to latest spec
init_msgs.append(
FeedInit(mkt_info=mkt)
)
nsym = piker_sym_to_cb_sym(sym)
async with maybe_open_price_feed(sym) as stream:
cache = await client.cache_symbols()
cache = client._pairs
last_trades = (await client.last_trades(
cb_sym_to_deribit_inst(nsym), count=1)).trades
@ -174,12 +259,21 @@ async def open_symbol_search(
async with open_cached_client('deribit') as client:
# load all symbols locally for fast search
cache = await client.cache_symbols()
cache = client._pairs
await ctx.started()
async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream:
# repack in dict form
await stream.send(
await client.search_symbols(pattern))
# NOTE: pattern fuzzy-matching is done within
# the methd impl.
pairs: dict[str, Pair] = await client.search_symbols(
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)

View File

@ -0,0 +1,191 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
import pendulum
from typing import (
Literal,
)
from decimal import Decimal
from msgspec import field
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}]
tick_size_steps: list[dict[str, float]]
@property
def price_tick(self) -> Decimal:
return Decimal(str(self.tick_size_steps[0]['above_price']))
@property
def size_tick(self) -> Decimal:
return Decimal(str(self.tick_size))
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
@property
def expiry(self) -> str:
iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat()
return iso_date
@property
def venue(self) -> str:
return 'option'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.symbol}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}
class JSONRPCResult(Struct):
id: int
usIn: int
usOut: int
usDiff: int
testnet: bool
jsonrpc: str = '2.0'
error: Optional[dict] = None
result: Optional[list[dict]] = None
class JSONRPCChannel(Struct):
method: str
params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct):
low: list[float]
cost: list[float]
high: list[float]
open: list[float]
close: list[float]
ticks: list[int]
status: str
volume: list[float]
class Trade(Struct):
iv: float
price: float
amount: float
trade_id: str
contracts: float
direction: str
trade_seq: int
timestamp: int
mark_price: float
index_price: float
tick_direction: int
instrument_name: str
combo_id: Optional[str] = '',
combo_trade_id: Optional[int] = 0,
block_trade_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0,
class LastTradesResult(Struct):
trades: list[Trade]
has_more: bool

View File

@ -111,6 +111,10 @@ class KucoinMktPair(Struct, frozen=True):
quoteMaxSize: float
quoteMinSize: float
symbol: str # our bs_mktid, kucoin's internal id
feeCategory: int
makerFeeCoefficient: float
takerFeeCoefficient: float
st: bool
class AccountTrade(Struct, frozen=True):
@ -593,7 +597,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
'''
async with (
httpx.AsyncClient(
base_url=f'https://api.kucoin.com/api',
base_url='https://api.kucoin.com/api',
) as trio_client,
):
client = Client(httpx_client=trio_client)
@ -637,7 +641,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.info('Starting ping task for kucoin ws connection')
log.warning('Starting ping task for kucoin ws connection')
n.start_soon(ping_server)
yield
@ -649,9 +653,14 @@ async def open_ping_task(
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, KucoinMktPair]:
) -> tuple[
MktPair,
KucoinMktPair,
]:
'''
Query for and return a `MktPair` and `KucoinMktPair`.
Query for and return both a `piker.accounting.MktPair` and
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
'''
async with open_cached_client('kucoin') as client:
@ -726,6 +735,8 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str)
init_msgs.append(
FeedInit(mkt_info=mkt)
@ -733,7 +744,11 @@ async def stream_quotes(
ws: NoBsWs
token, ping_interval = await client._get_ws_token()
connect_id = str(uuid4())
log.info('API reported ping_interval: {ping_interval}\n')
connect_id: str = str(uuid4())
typ: str
quote: dict
async with (
open_autorecon_ws(
(
@ -747,20 +762,37 @@ async def stream_quotes(
),
) as ws,
open_ping_task(ws, ping_interval, connect_id),
aclosing(stream_messages(ws, sym_str)) as msg_gen,
aclosing(
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
):
typ, quote = await anext(msg_gen)
typ, quote = await anext(iter_quotes)
while typ != 'trade':
# take care to not unblock here until we get a real
# trade quote
typ, quote = await anext(msg_gen)
# trade quote?
# ^TODO, remove this right?
# -[ ] what often blocks chart boot/new-feed switching
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote))
feed_is_live.set()
async for typ, msg in msg_gen:
await send_chan.send({sym_str: msg})
# XXX NOTE, DO NOT include the `.<backend>` suffix!
# OW the sampling loop will not broadcast correctly..
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm
@ -815,7 +847,7 @@ async def subscribe(
)
async def stream_messages(
async def iter_normed_quotes(
ws: NoBsWs,
sym: str,
@ -846,6 +878,9 @@ async def stream_messages(
yield 'trade', {
'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price,
'brokerd_ts': last_trade_ts,
'ticks': [
@ -938,7 +973,7 @@ async def open_history_client(
if end_dt is None:
inow = round(time.time())
print(
log.debug(
f'difference in time between load and processing'
f'{inow - times[-1]}'
)

View File

@ -653,7 +653,11 @@ class Router(Struct):
flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
@ -716,7 +720,7 @@ class Router(Struct):
subs = self.subscribers[sub_key]
sent_some: bool = False
for client_stream in subs:
for client_stream in subs.copy():
try:
await client_stream.send(msg)
sent_some = True
@ -1010,6 +1014,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
if not status_msg.req:
# likely some order change state?
await tractor.pause()
else:
await router.client_broadcast(
status_msg.req.symbol,
status_msg,

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query',
loglevel=config['loglevel'] if tl else None,
),
tractor.get_arbiter(
tractor.get_registry(
host=host,
port=ports[0]
) as portal

View File

@ -25,6 +25,7 @@ from collections import (
defaultdict,
)
from contextlib import asynccontextmanager as acm
from functools import partial
import time
from typing import (
Any,
@ -42,7 +43,7 @@ from tractor.trionics import (
maybe_open_nursery,
)
import trio
from trio_typing import TaskStatus
from trio import TaskStatus
from .ticktools import (
frame_ticks,
@ -70,6 +71,7 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler:
'''
Global sampling engine registry.
@ -79,9 +81,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
time-step-sample a (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
'''
service_nursery: None | trio.Nursery = None
@ -375,7 +377,10 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys()))
await ctx.started(
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream:
try:
@ -419,7 +424,6 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str | None = None,
**extra_tractor_kwargs
@ -429,7 +433,10 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting.
'''
from piker.service import Services
from piker.service import (
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd'
log.info(f'Spawning `{dname}`')
@ -437,26 +444,33 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api?
async with Services.locks[dname + '_singleton']:
mngr: ServiceMngr = get_service_mngr()
already_started: bool = dname in mngr.service_tasks
if dname not in Services.service_tasks:
async with mngr._locks[dname + '_singleton']:
ctx: Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
portal = await Services.actor_n.start_actor(
dname,
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
)
return True
@ -889,6 +903,7 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError,
trio.BrokenResourceError,
ConnectionResetError,

View File

@ -273,7 +273,7 @@ async def _reconnect_forever(
nobsws._connected.set()
await trio.sleep_forever()
except HandshakeError:
log.exception(f'Retrying connection')
log.exception('Retrying connection')
# ws & nursery block ends
@ -359,8 +359,8 @@ async def open_autorecon_ws(
'''
JSONRPC response-request style machinery for transparent multiplexing of msgs
over a NoBsWs.
JSONRPC response-request style machinery for transparent multiplexing
of msgs over a NoBsWs.
'''
@ -377,16 +377,20 @@ async def open_jsonrpc_session(
url: str,
start_id: int = 0,
response_type: type = JSONRPCResult,
request_type: Optional[type] = None,
request_hook: Optional[Callable] = None,
error_hook: Optional[Callable] = None,
# request_type: Optional[type] = None,
# request_hook: Optional[Callable] = None,
# error_hook: Optional[Callable] = None,
) -> Callable[[str, dict], dict]:
# NOTE, store all request msgs so we can raise errors on the
# caller side!
req_msgs: dict[int, dict] = {}
async with (
trio.open_nursery() as n,
open_autorecon_ws(url) as ws
):
rpc_id: Iterable = count(start_id)
rpc_id: Iterable[int] = count(start_id)
rpc_results: dict[int, dict] = {}
async def json_rpc(method: str, params: dict) -> dict:
@ -394,27 +398,41 @@ async def open_jsonrpc_session(
perform a json rpc call and wait for the result, raise exception in
case of error field present on response
'''
nonlocal req_msgs
req_id: int = next(rpc_id)
msg = {
'jsonrpc': '2.0',
'id': next(rpc_id),
'id': req_id,
'method': method,
'params': params
}
_id = msg['id']
rpc_results[_id] = {
result = rpc_results[_id] = {
'result': None,
'event': trio.Event()
'error': None,
'event': trio.Event(), # signal caller resp arrived
}
req_msgs[_id] = msg
await ws.send_msg(msg)
# wait for reponse before unblocking requester code
await rpc_results[_id]['event'].wait()
ret = rpc_results[_id]['result']
if (maybe_result := result['result']):
ret = maybe_result
del rpc_results[_id]
else:
err = result['error']
raise Exception(
f'JSONRPC request failed\n'
f'req: {msg}\n'
f'resp: {err}\n'
)
if ret.error is not None:
raise Exception(json.dumps(ret.error, indent=4))
@ -428,6 +446,7 @@ async def open_jsonrpc_session(
the server side.
'''
nonlocal req_msgs
async for msg in ws:
match msg:
case {
@ -451,15 +470,29 @@ async def open_jsonrpc_session(
'params': _,
}:
log.debug(f'Recieved\n{msg}')
if request_hook:
await request_hook(request_type(**msg))
# if request_hook:
# await request_hook(request_type(**msg))
case {
'error': error
}:
log.warning(f'Recieved\n{error}')
if error_hook:
await error_hook(response_type(**msg))
# if error_hook:
# await error_hook(response_type(**msg))
# retreive orig request msg, set error
# response in original "result" msg,
# THEN FINALLY set the event to signal caller
# to raise the error in the parent task.
req_id: int = msg['id']
req_msg: dict = req_msgs[req_id]
result: dict = rpc_results[req_id]
result['error'] = error
result['event'].set()
log.error(
f'JSONRPC request failed\n'
f'req: {req_msg}\n'
f'resp: {error}\n'
)
case _:
log.warning(f'Unhandled JSON-RPC msg!?\n{msg}')

View File

@ -30,7 +30,11 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere?
'''
from ._mngr import Services as Services
from ._mngr import (
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import (
_tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr,

View File

@ -21,7 +21,6 @@
from __future__ import annotations
import os
from typing import (
Optional,
Any,
ClassVar,
)
@ -30,13 +29,13 @@ from contextlib import (
)
import tractor
import trio
from ._util import (
get_console_log,
)
from ._mngr import (
Services,
open_service_mngr,
ServiceMngr,
)
from ._registry import ( # noqa
_tractor_kwargs,
@ -59,7 +58,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [],
loglevel: Optional[str] = None,
loglevel: str|None = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
@ -69,7 +68,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that.
start_method: str = 'trio',
tractor_runtime_overrides: dict | None = None,
tractor_runtime_overrides: dict|None = None,
**tractor_kwargs,
) -> tuple[
@ -119,6 +118,10 @@ async def open_piker_runtime(
# spawn other specialized daemons I think?
enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs,
) as actor,
@ -167,12 +170,13 @@ async def open_pikerd(
**kwargs,
) -> Services:
) -> ServiceMngr:
'''
Start a root piker daemon with an indefinite lifetime.
Start a root piker daemon actor (aka `pikerd`) with an indefinite
lifetime.
A root actor nursery is created which can be used to create and keep
alive underling services (see below).
A root actor-nursery is created which can be used to spawn and
supervise underling service sub-actors (see below).
'''
# NOTE: for the root daemon we always enable the root
@ -199,8 +203,6 @@ async def open_pikerd(
root_actor,
reg_addrs,
),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
):
for addr in reg_addrs:
if addr not in root_actor.accept_addrs:
@ -209,25 +211,17 @@ async def open_pikerd(
'Maybe you have another daemon already running?'
)
# assign globally for future daemon/task creation
Services.actor_n = actor_nursery
Services.service_n = service_nursery
Services.debug_mode = debug_mode
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
mngr: ServiceMngr
async with open_service_mngr(
debug_mode=debug_mode,
) as mngr:
yield mngr
# TODO: do we even need this?
# @acm
# async def maybe_open_runtime(
# loglevel: Optional[str] = None,
# loglevel: str|None = None,
# **kwargs,
# ) -> None:
@ -256,7 +250,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None,
**kwargs,
) -> tractor._portal.Portal | ClassVar[Services]:
) -> tractor._portal.Portal | ClassVar[ServiceMngr]:
'''
If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout,
)
from ._mngr import Services
from ._mngr import ServiceMngr
from ._util import (
log, # sub-sys logger
get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm
async def start_ahab_service(
services: Services,
services: ServiceMngr,
service_name: str,
# endpoint config passed as **kwargs
@ -549,7 +549,8 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container')
except (
trio.MultiError,
# trio.MultiError,
ExceptionGroup,
) as err:
for subexc in err.exceptions:
if isinstance(subexc, PermissionError):

View File

@ -26,14 +26,17 @@ from typing import (
from contextlib import (
asynccontextmanager as acm,
)
from collections import defaultdict
import tractor
import trio
from ._util import (
log, # sub-sys logger
)
from ._mngr import (
Services,
get_service_mngr,
ServiceMngr,
)
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service
@ -41,15 +44,14 @@ from ._registry import find_service
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
spawn_args: dict[str, Any],
loglevel: str | None = None,
singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs,
) -> tractor.Portal:
@ -67,7 +69,7 @@ async def maybe_spawn_daemon(
'''
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Services.locks[service_name]
lock = _locks[service_name]
await lock.acquire()
async with find_service(
@ -132,7 +134,65 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal:
lock.release()
yield portal
await portal.cancel_actor()
# --- ---- ---
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd(
@ -147,21 +207,22 @@ async def spawn_emsd(
"""
log.info('Spawning emsd')
portal = await Services.actor_n.start_actor(
smngr: ServiceMngr = get_service_mngr()
portal = await smngr.actor_n.start_actor(
'emsd',
enable_modules=[
'piker.clearing._ems',
'piker.clearing._client',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
debug_mode=smngr.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
# non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd
await Services.start_service_task(
await smngr.start_service_task(
'emsd',
portal,

View File

@ -18,16 +18,29 @@
daemon-service management API.
"""
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import (
Callable,
Any,
)
import trio
from trio_typing import TaskStatus
import msgspec
import tractor
import trio
from trio import TaskStatus
from tractor import (
ActorNursery,
current_actor,
ContextCancelled,
Context,
@ -39,6 +52,130 @@ from ._util import (
)
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the
# library.
@ -46,31 +183,46 @@ from ._util import (
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
class Services:
@dataclass
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
actor_n: tractor._supervise.ActorNursery
Spawn, supervise and monitor service/daemon subactors in a SC
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
debug_mode: bool = False # tractor sub-actor debug mode flag
service_tasks: dict[
str,
tuple[
trio.CancelScope,
Context,
Portal,
trio.Event,
]
] = {}
locks = defaultdict(trio.Lock)
] = field(default_factory=dict)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task(
self,
name: str,
portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context):
) -> (trio.CancelScope, Context, Any):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
@ -83,6 +235,7 @@ class Services:
task_status: TaskStatus[
tuple[
trio.CancelScope,
Context,
trio.Event,
Any,
]
@ -90,64 +243,87 @@ class Services:
) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, first):
) as (ctx, started):
# unblock once the remote context has started
complete = trio.Event()
task_status.started((cs, complete, first))
task_status.started((
cs,
ctx,
complete,
started,
))
log.info(
f'`pikerd` service {name} started with value {first}'
f'`pikerd` service {name} started with value {started}'
)
try:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
ctx_res: Any = await ctx.wait_for_result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
return (
await portal.wait_for_result(),
ctx_res,
)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n'
f'canceller: {canceller}\n'
)
else:
raise
finally:
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, complete, first = await self.service_n.start(open_context_in_task)
cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or
# retstart if needed.
self.service_tasks[name] = (cs, portal, complete)
self.service_tasks[name] = (cs, sub_ctx, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service(
self,
name: str,
@ -158,8 +334,80 @@ class Services:
'''
log.info(f'Cancelling `pikerd` service {name}')
cs, portal, complete = self.service_tasks[name]
cs.cancel()
cs, sub_ctx, portal, complete = self.service_tasks[name]
# cs.cancel()
await sub_ctx.cancel()
await complete.wait()
assert name not in self.service_tasks, \
if name in self.service_tasks:
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,11 +21,13 @@ from typing import (
TYPE_CHECKING,
)
# TODO: oof, needs to be changed to `httpx`!
import asks
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger
from ._util import (
@ -127,7 +129,7 @@ def start_elasticsearch(
@acm
async def start_ahab_daemon(
service_mngr: Services,
service_mngr: ServiceMngr,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc
from ..data.feed import maybe_open_feed
from . import Services
from . import ServiceMngr
from ._util import (
log, # sub-sys logger
get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm
async def start_ahab_daemon(
service_mngr: Services,
service_mngr: ServiceMngr,
user_config: dict | None = None,
loglevel: str | None = None,

View File

@ -458,13 +458,15 @@ async def start_backfill(
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return
assert (
@ -578,6 +580,7 @@ async def start_backfill(
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.

2326
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -50,10 +50,8 @@ attrs = "^23.1.0"
bidict = "^0.22.1"
colorama = "^0.4.6"
colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86"
msgspec = "^0.18.0"
msgspec = "^0.18.6"
numba = "^0.59.0"
numpy = "^1.25"
polars = "^0.18.13"
@ -71,11 +69,13 @@ pdbp = "^1.5.0"
trio = "^0.24"
pendulum = "^3.0.0"
httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor]
develop = true
git = 'https://github.com/goodboy/tractor.git'
branch = 'asyncio_debugger_support'
git = 'https://pikers.dev/goodboy/tractor.git'
branch = 'aio_abandons'
# path = "../tractor"
[tool.poetry.dependencies.asyncvnc]
@ -109,6 +109,8 @@ pytest = "^6.0.0"
elasticsearch = "^8.9.0"
xonsh = "^0.14.2"
prompt-toolkit = "3.0.40"
cython = "^3.0.0"
greenback = "^1.1.1"
# console ehancements and eventually remote debugging
# extras/helpers.

View File

@ -10,7 +10,7 @@ from piker import (
config,
)
from piker.service import (
Services,
get_service_mngr,
)
from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager,
):
# this proc/actor is the pikerd
assert service_manager is Services
assert service_manager is get_service_mngr()
async with tractor.wait_for_actor(
'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor
from uuid import uuid4
from piker.service import Services
from piker.service import ServiceMngr
from piker.log import get_logger
from piker.clearing._messages import (
Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker(
open_test_pikerd: Services,
open_test_pikerd: ServiceMngr,
loglevel: str,
):
async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import (
find_service,
Services,
ServiceMngr,
)
from piker.data import (
open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main():
port = 6666
daemon_addr = ('127.0.0.1', port)
services: Services
services: ServiceMngr
async with (
open_test_pikerd(