Compare commits

..

1 Commits

Author SHA1 Message Date
Tyler Goodlet e79554d0ac data._web_bs: try to raise jsonrpc errors in parent task 2025-01-29 16:03:57 -03:00
28 changed files with 719 additions and 3092 deletions

View File

@ -1,82 +0,0 @@
with (import <nixpkgs> {});
with python312Packages;
let
glibStorePath = lib.getLib glib;
qtpyStorePath = lib.getLib qtpy;
pyqt6StorePath = lib.getLib pyqt6;
pyqt6SipStorePath = lib.getLib pyqt6-sip;
qt6baseStorePath = lib.getLib qt6.qtbase;
rapidfuzzStorePath = lib.getLib rapidfuzz;
qdarkstyleStorePath = lib.getLib qdarkstyle;
in
stdenv.mkDerivation {
name = "piker-qt6-poetry-shell";
buildInputs = [
# System requirements.
glib
qt6.qtbase
libgcc.lib
# Python requirements.
python312Full
poetry-core
qdarkstyle
rapidfuzz
pyqt6
qtpy
];
src = null;
shellHook = ''
set -e
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib
# Set the Qt plugin path
# export QT_DEBUG_PLUGINS=1
QTBASE_PATH="${qt6baseStorePath}"
echo "qtbase path: $QTBASE_PATH"
echo ""
export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins"
export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms"
echo "qt plugin path: $QT_PLUGIN_PATH"
echo ""
# Maybe create venv & install deps
poetry install --with uis
# Use pyqt6 from System, patch activate script
ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate"
export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages"
export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages"
export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages"
export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages"
export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages"
echo "rapidfuzz at: $RPDFUZZ_PATH"
echo "qdarkstyle at: $QDRKSTYLE_PATH"
echo "qtpy at: $QTPY_PATH"
echo "pyqt6 at: $PYQT6_PATH"
echo "pyqt6-sip at: $PYQT6_SIP_PATH"
echo ""
PATCH="export PYTHONPATH=\""
PATCH="$PATCH\$RPDFUZZ_PATH"
PATCH="$PATCH:\$QDRKSTYLE_PATH"
PATCH="$PATCH:\$QTPY_PATH"
PATCH="$PATCH:\$PYQT6_PATH"
PATCH="$PATCH:\$PYQT6_SIP_PATH"
PATCH="$PATCH\""
if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then
echo "venv is already patched."
else
echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..."
sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH
fi
poetry shell
'';
}

View File

@ -30,8 +30,7 @@ from types import ModuleType
from typing import ( from typing import (
Any, Any,
Iterator, Iterator,
Generator, Generator
TYPE_CHECKING,
) )
import pendulum import pendulum
@ -60,10 +59,8 @@ from ..clearing._messages import (
BrokerdPosition, BrokerdPosition,
) )
from piker.types import Struct from piker.types import Struct
from piker.log import get_logger from piker.data._symcache import SymbologyCache
from ..log import get_logger
if TYPE_CHECKING:
from piker.data._symcache import SymbologyCache
log = get_logger(__name__) log = get_logger(__name__)
@ -496,17 +493,6 @@ class Account(Struct):
_mktmap_table: dict[str, MktPair] | None = None, _mktmap_table: dict[str, MktPair] | None = None,
only_require: list[str]|True = True,
# ^list of fqmes that are "required" to be processed from
# this ledger pass; we often don't care about others and
# definitely shouldn't always error in such cases.
# (eg. broker backend loaded that doesn't yet supsport the
# symcache but also, inside the paper engine we don't ad-hoc
# request `get_mkt_info()` for every symbol in the ledger,
# only the one for which we're simulating against).
# TODO, not sure if there's a better soln for this, ideally
# all backends get symcache support afap i guess..
) -> dict[str, Position]: ) -> dict[str, Position]:
''' '''
Update the internal `.pps[str, Position]` table from input Update the internal `.pps[str, Position]` table from input
@ -549,32 +535,11 @@ class Account(Struct):
if _mktmap_table is None: if _mktmap_table is None:
raise raise
required: bool = (
only_require is True
or (
only_require is not True
and
fqme in only_require
)
)
# XXX: caller is allowed to provide a fallback # XXX: caller is allowed to provide a fallback
# mktmap table for the case where a new position is # mktmap table for the case where a new position is
# being added and the preloaded symcache didn't # being added and the preloaded symcache didn't
# have this entry prior (eg. with frickin IB..) # have this entry prior (eg. with frickin IB..)
if ( mkt = _mktmap_table[fqme]
not (mkt := _mktmap_table.get(fqme))
and
required
):
raise
elif not required:
continue
else:
# should be an entry retreived somewhere
assert mkt
if not (pos := pps.get(bs_mktid)): if not (pos := pps.get(bs_mktid)):
@ -691,7 +656,7 @@ class Account(Struct):
def write_config(self) -> None: def write_config(self) -> None:
''' '''
Write the current account state to the user's account TOML file, normally Write the current account state to the user's account TOML file, normally
something like `pps.toml`. something like ``pps.toml``.
''' '''
# TODO: show diff output? # TODO: show diff output?

View File

@ -51,7 +51,6 @@ __brokers__: list[str] = [
'ib', 'ib',
'kraken', 'kraken',
'kucoin', 'kucoin',
'deribit',
# broken but used to work # broken but used to work
# 'questrade', # 'questrade',
@ -62,6 +61,7 @@ __brokers__: list[str] = [
# wstrade # wstrade
# iex # iex
# deribit
# bitso # bitso
] ]

View File

@ -23,7 +23,6 @@ from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from functools import partial
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@ -191,17 +190,14 @@ def broker_init(
async def spawn_brokerd( async def spawn_brokerd(
brokername: str, brokername: str,
loglevel: str | None = None, loglevel: str | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> bool: ) -> bool:
'''
Spawn a `brokerd.<backendname>` subactor service daemon
using `pikerd`'s service mngr.
'''
from piker.service._util import log # use service mngr log from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon') log.info(f'Spawning {brokername} broker daemon')
@ -221,35 +217,27 @@ async def spawn_brokerd(
# ask `pikerd` to spawn a new sub-actor and manage it under its # ask `pikerd` to spawn a new sub-actor and manage it under its
# actor nursery # actor nursery
from piker.service import ( from piker.service import Services
get_service_mngr,
ServiceMngr,
)
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
mngr: ServiceMngr = get_service_mngr()
ctx: tractor.Context = await mngr.start_service(
daemon_name=dname,
ctx_ep=partial(
# signature of target root-task endpoint
daemon_fixture_ep,
# passed to daemon_fixture_ep(**kwargs) dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
brokername=brokername, portal = await Services.actor_n.start_actor(
loglevel=loglevel, dname,
), enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
debug_mode=mngr.debug_mode, debug_mode=Services.debug_mode,
loglevel=loglevel,
enable_modules=(
_data_mods
+
tractor_kwargs.pop('enable_modules')
),
**tractor_kwargs **tractor_kwargs
) )
assert (
not ctx.cancel_called # NOTE: the service mngr expects an already spawned actor + its
and ctx.portal # parent side # portal ref in order to do non-blocking setup of brokerd
and dname in ctx.chan.uid # subactor is named as desired # service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
) )
return True return True
@ -274,7 +262,8 @@ async def maybe_spawn_brokerd(
from piker.service import maybe_spawn_daemon from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon( async with maybe_spawn_daemon(
service_name=f'brokerd.{brokername}',
f'brokerd.{brokername}',
service_task_target=spawn_brokerd, service_task_target=spawn_brokerd,
spawn_args={ spawn_args={
'brokername': brokername, 'brokername': brokername,

View File

@ -567,7 +567,6 @@ class Client:
) -> str: ) -> str:
return { return {
'USDTM': 'usdtm_futes', 'USDTM': 'usdtm_futes',
'SPOT': 'spot',
# 'COINM': 'coin_futes', # 'COINM': 'coin_futes',
# ^-TODO-^ bc someone might want it..? # ^-TODO-^ bc someone might want it..?
}[pair.venue] }[pair.venue]

View File

@ -181,6 +181,7 @@ class FutesPair(Pair):
quoteAsset: str # 'USDT', quoteAsset: str # 'USDT',
quotePrecision: int # 8, quotePrecision: int # 8,
requiredMarginPercent: float # '5.0000', requiredMarginPercent: float # '5.0000',
settlePlan: int # 0,
timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'], timeInForce: list[str] # ['GTC', 'IOC', 'FOK', 'GTX'],
triggerProtect: float # '0.0500', triggerProtect: float # '0.0500',
underlyingSubType: list[str] # ['PoW'], underlyingSubType: list[str] # ['PoW'],

View File

@ -25,7 +25,6 @@ from .api import (
get_client, get_client,
) )
from .feed import ( from .feed import (
get_mkt_info,
open_history_client, open_history_client,
open_symbol_search, open_symbol_search,
stream_quotes, stream_quotes,
@ -35,20 +34,15 @@ from .feed import (
# open_trade_dialog, # open_trade_dialog,
# norm_trade_records, # norm_trade_records,
# ) # )
from .venues import (
OptionPair,
)
log = get_logger(__name__) log = get_logger(__name__)
__all__ = [ __all__ = [
'get_client', 'get_client',
# 'trades_dialogue', # 'trades_dialogue',
'get_mkt_info',
'open_history_client', 'open_history_client',
'open_symbol_search', 'open_symbol_search',
'stream_quotes', 'stream_quotes',
'OptionPair',
# 'norm_trade_records', # 'norm_trade_records',
] ]

View File

@ -19,14 +19,10 @@ Deribit backend.
''' '''
import asyncio import asyncio
from collections import ChainMap
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from datetime import datetime from datetime import datetime
from decimal import (
Decimal,
)
from functools import partial from functools import partial
import time import time
from typing import ( from typing import (
@ -35,7 +31,7 @@ from typing import (
Callable, Callable,
) )
from pendulum import now import pendulum
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
@ -55,18 +51,7 @@ from cryptofeed.defines import (
OPTION, CALL, PUT OPTION, CALL, PUT
) )
from cryptofeed.symbols import Symbol from cryptofeed.symbols import Symbol
# types for managing the cb callbacks.
# from cryptofeed.types import L1Book
from .venues import (
MarketType,
PAIRTYPES,
Pair,
OptionPair,
)
from piker.accounting import (
Asset,
MktPair,
)
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
match_from_pairs, match_from_pairs,
@ -95,19 +80,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2'
class JSONRPCResult(Struct): class JSONRPCResult(Struct):
jsonrpc: str = '2.0'
id: int id: int
result: Optional[list[dict]] = None
error: Optional[dict] = None
usIn: int usIn: int
usOut: int usOut: int
usDiff: int usDiff: int
testnet: bool testnet: bool
jsonrpc: str = '2.0'
result: Optional[list[dict]] = None
error: Optional[dict] = None
class JSONRPCChannel(Struct): class JSONRPCChannel(Struct):
jsonrpc: str = '2.0'
method: str method: str
params: dict params: dict
jsonrpc: str = '2.0'
class KLinesResult(Struct): class KLinesResult(Struct):
@ -131,12 +116,9 @@ class Trade(Struct):
instrument_name: str instrument_name: str
index_price: float index_price: float
direction: str direction: str
contracts: float
amount: float
combo_trade_id: Optional[int] = 0, combo_trade_id: Optional[int] = 0,
combo_id: Optional[str] = '', combo_id: Optional[str] = '',
block_trade_leg_count: Optional[int] = 0, amount: float
block_trade_id: Optional[str] = '',
class LastTradesResult(Struct): class LastTradesResult(Struct):
trades: list[Trade] trades: list[Trade]
@ -160,15 +142,13 @@ def str_to_cb_sym(name: str) -> Symbol:
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
new_expiry_date = get_values_from_cb_normalized_date(expiry_date)
return Symbol( return Symbol(
base=base, base, quote,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=new_expiry_date) expiry_date=expiry_date,
expiry_normalize=False)
def piker_sym_to_cb_sym(name: str) -> Symbol: def piker_sym_to_cb_sym(name: str) -> Symbol:
@ -179,103 +159,68 @@ def piker_sym_to_cb_sym(name: str) -> Symbol:
if option_type == 'P': if option_type == 'P':
option_type = PUT option_type = PUT
elif option_type == 'C': elif option_type == 'C':
option_type = CALL option_type = CALL
else: else:
raise Exception("Couldn\'t parse option type") raise Exception("Couldn\'t parse option type")
return Symbol( return Symbol(
base=base, base, quote,
quote=quote,
type=OPTION, type=OPTION,
strike_price=strike_price, strike_price=strike_price,
option_type=option_type, option_type=option_type,
expiry_date=expiry_date) expiry_date=expiry_date.upper())
def cb_sym_to_deribit_inst(sym: Symbol): def cb_sym_to_deribit_inst(sym: Symbol):
new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) # cryptofeed normalized
otype = 'C' if sym.option_type == CALL else 'P' cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' # deribit specific
months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
exp = sym.expiry_date
def get_values_from_cb_normalized_date(expiry_date: str) -> str:
# deribit specific
cb_norm = [
'F', 'G', 'H', 'J',
'K', 'M', 'N', 'Q',
'U', 'V', 'X', 'Z'
]
months = [
'JAN', 'FEB', 'MAR', 'APR',
'MAY', 'JUN', 'JUL', 'AUG',
'SEP', 'OCT', 'NOV', 'DEC'
]
# YYMDD # YYMDD
# 01234 # 01234
day, month, year = ( year, month, day = (
expiry_date[3:], exp[:2], months[cb_norm.index(exp[2:3])], exp[3:])
months[cb_norm.index(expiry_date[2:3])],
expiry_date[:2] otype = 'C' if sym.option_type == CALL else 'P'
)
return f'{day}{month}{year}' return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}'
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
conf: dict conf, path = config.load()
path: Path
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
section: dict = {}
section = conf.get('deribit') section = conf.get('deribit')
section['log'] = {} # TODO: document why we send this, basically because logging params for cryptofeed
section['log']['filename'] = 'feedhandler.log' conf['log'] = {}
section['log']['level'] = 'DEBUG' conf['log']['disabled'] = True
section['log']['disabled'] = True
if section is None: if section is None:
log.warning(f'No config section found for deribit in {path}') log.warning(f'No config section found for deribit in {path}')
return {}
return section return conf
def get_fh_config() -> dict[str, Any]:
conf_option = get_config().get('option', {})
conf_log = get_config().get('log', {})
return {
'log': {
'filename': conf_log.get('filename'),
'level': conf_log.get('level'),
'disabled': conf_log.get('disabled')
},
'deribit': {
'key_id': conf_option.get('api_key'),
'key_secret': conf_option.get('api_secret')
}
}
class Client: class Client:
def __init__( def __init__(self, json_rpc: Callable) -> None:
self, self._pairs: dict[str, Any] = None
json_rpc: Callable config = get_config().get('deribit', {})
) -> None: if ('key_id' in config) and ('key_secret' in config):
self._pairs: ChainMap[str, Pair] = ChainMap() self._key_id = config['key_id']
self._key_secret = config['key_secret']
config = get_config().get('option', {}) else:
self._key_id = None
self._key_id = config.get('api_key') self._key_secret = None
self._key_secret = config.get('api_secret')
self.json_rpc = json_rpc self.json_rpc = json_rpc
@ -283,10 +228,7 @@ class Client:
def currencies(self): def currencies(self):
return ['btc', 'eth', 'sol', 'usd'] return ['btc', 'eth', 'sol', 'usd']
async def get_balances( async def get_balances(self, kind: str = 'option') -> dict[str, float]:
self,
kind: str = 'option'
) -> dict[str, float]:
"""Return the set of positions for this account """Return the set of positions for this account
by symbol. by symbol.
""" """
@ -302,39 +244,20 @@ class Client:
return balances return balances
async def get_assets( async def get_assets(self) -> dict[str, float]:
self,
venue: str | None = None,
) -> dict[str, Asset]:
"""Return the set of asset balances for this account """Return the set of asset balances for this account
by symbol. by symbol.
""" """
assets = {} balances = {}
resp = await self.json_rpc(
'private/get_account_summaries',
params={
'extended' : True
}
)
summaries = resp.result['summaries']
for summary in summaries:
currency = summary['currency']
tx_tick = Decimal('1e-08')
atype='crypto_currency'
assets[currency] = Asset(
name=currency,
atype=atype,
tx_tick=tx_tick)
return assets
async def get_mkt_pairs(self) -> dict[str, Pair]: for currency in self.currencies:
flat: dict[str, Pair] = {} resp = await self.json_rpc(
for key in self._pairs: 'private/get_account_summary', params={
item = self._pairs.get(key) 'currency': currency.upper()})
flat[item.bs_fqme] = item
return flat balances[currency] = resp.result['balance']
return balances
async def submit_limit( async def submit_limit(
self, self,
@ -363,28 +286,6 @@ class Client:
'private/cancel', {'order_id': oid}) 'private/cancel', {'order_id': oid})
return resp.result return resp.result
async def exch_info(
self,
sym: str | None = None,
venue: MarketType | None = None,
expiry: str | None = None,
) -> dict[str, Pair] | Pair:
pair_table: dict[str, Pair] = self._pairs
if (
sym
and (cached_pair := pair_table.get(sym))
):
return cached_pair
if sym:
return pair_table[sym.lower()]
else:
return self._pairs
async def symbol_info( async def symbol_info(
self, self,
instrument: Optional[str] = None, instrument: Optional[str] = None,
@ -392,7 +293,7 @@ class Client:
kind: str = 'option', kind: str = 'option',
expired: bool = False expired: bool = False
) -> dict[str, Pair] | Pair: ) -> dict[str, dict]:
''' '''
Get symbol infos. Get symbol infos.
@ -412,29 +313,14 @@ class Client:
params, params,
) )
# convert to symbol-keyed table # convert to symbol-keyed table
pair_type: Type = PAIRTYPES[kind]
results: list[dict] | None = resp.result results: list[dict] | None = resp.result
instruments: dict[str, dict] = {
instruments: dict[str, Pair] = {} item['instrument_name'].lower(): item
for item in results: for item in results
symbol=item['instrument_name'].lower() }
try:
pair: Pair = pair_type(
symbol=symbol,
**item
)
except Exception as e:
e.add_note(
"\nDon't panic, prolly stupid deribit changed their symbology schema again..\n"
'Check out their API docs here:\n\n'
'https://docs.deribit.com/?python#deribit-api-v2-1-1'
)
raise
instruments[symbol] = pair
if instrument is not None: if instrument is not None:
return instruments[instrument.lower()] return instruments[instrument]
else: else:
return instruments return instruments
@ -451,12 +337,12 @@ class Client:
self, self,
pattern: str, pattern: str,
limit: int = 30, limit: int = 30,
) -> dict[str, Pair]: ) -> dict[str, Any]:
''' '''
Fuzzy search symbology set for pairs matching `pattern`. Fuzzy search symbology set for pairs matching `pattern`.
''' '''
pairs: dict[str, Pair] = await self.symbol_info() pairs: dict[str, Any] = await self.symbol_info()
matches: dict[str, Pair] = match_from_pairs( matches: dict[str, Pair] = match_from_pairs(
pairs=pairs, pairs=pairs,
query=pattern.upper(), query=pattern.upper(),
@ -472,19 +358,16 @@ class Client:
async def bars( async def bars(
self, self,
mkt: MktPair, symbol: str,
start_dt: Optional[datetime] = None, start_dt: Optional[datetime] = None,
end_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None,
limit: int = 1000, limit: int = 1000,
as_np: bool = True, as_np: bool = True,
) -> dict:
) -> list[tuple] | np.ndarray: instrument = symbol
instrument: str = mkt.bs_fqme
if end_dt is None: if end_dt is None:
end_dt = now('UTC') end_dt = pendulum.now('UTC')
if start_dt is None: if start_dt is None:
start_dt = end_dt.start_of( start_dt = end_dt.start_of(
@ -504,27 +387,29 @@ class Client:
}) })
result = KLinesResult(**resp.result) result = KLinesResult(**resp.result)
new_bars: list[tuple] = [] new_bars = []
for i in range(len(result.close)): for i in range(len(result.close)):
row = [ _open = result.open[i]
high = result.high[i]
low = result.low[i]
close = result.close[i]
volume = result.volume[i]
row = [
(start_time + (i * (60 * 1000))) / 1000.0, # time (start_time + (i * (60 * 1000))) / 1000.0, # time
result.open[i], result.open[i],
result.high[i], result.high[i],
result.low[i], result.low[i],
result.close[i], result.close[i],
result.volume[i] result.volume[i],
0
] ]
new_bars.append((i,) + tuple(row)) new_bars.append((i,) + tuple(row))
if not as_np: array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines
return result return array
return np.array(
new_bars,
dtype=def_iohlcv_fields
)
async def last_trades( async def last_trades(
self, self,
@ -549,10 +434,10 @@ async def get_client(
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as n,
open_jsonrpc_session( open_jsonrpc_session(
_ws_url, response_type=JSONRPCResult _testnet_ws_url, dtype=JSONRPCResult) as json_rpc
) as json_rpc
): ):
client = Client(json_rpc) client = Client(json_rpc)
_refresh_token: Optional[str] = None _refresh_token: Optional[str] = None
_access_token: Optional[str] = None _access_token: Optional[str] = None
@ -622,7 +507,7 @@ async def get_client(
@acm @acm
async def open_feed_handler(): async def open_feed_handler():
fh = FeedHandler(config=get_fh_config()) fh = FeedHandler(config=get_config())
yield fh yield fh
await to_asyncio.run_task(fh.stop_async) await to_asyncio.run_task(fh.stop_async)
@ -638,7 +523,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay( async def aio_price_feed_relay(
fh: FeedHandler, fh: FeedHandler,
instrument: str, instrument: Symbol,
from_trio: asyncio.Queue, from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
) -> None: ) -> None:
@ -657,33 +542,21 @@ async def aio_price_feed_relay(
'symbol': cb_sym_to_deribit_inst( 'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(), str_to_cb_sym(data.symbol)).lower(),
'ticks': [ 'ticks': [
{ {'type': 'bid',
'type': 'bid', 'price': float(data.bid_price), 'size': float(data.bid_size)},
'price': float(data.bid_price), {'type': 'bsize',
'size': float(data.bid_size) 'price': float(data.bid_price), 'size': float(data.bid_size)},
}, {'type': 'ask',
{ 'price': float(data.ask_price), 'size': float(data.ask_size)},
'type': 'bsize', {'type': 'asize',
'price': float(data.bid_price), 'price': float(data.ask_price), 'size': float(data.ask_size)}
'size': float(data.bid_size)
},
{
'type': 'ask',
'price': float(data.ask_price),
'size': float(data.ask_size)
},
{
'type': 'asize',
'price': float(data.ask_price),
'size': float(data.ask_size)
}
] ]
})) }))
sym: Symbol = piker_sym_to_cb_sym(instrument)
fh.add_feed( fh.add_feed(
DERIBIT, DERIBIT,
channels=[TRADES, L1_BOOK], channels=[TRADES, L1_BOOK],
symbols=[sym], symbols=[piker_sym_to_cb_sym(instrument)],
callbacks={ callbacks={
TRADES: _trade, TRADES: _trade,
L1_BOOK: _l1 L1_BOOK: _l1
@ -724,9 +597,9 @@ async def maybe_open_price_feed(
async with maybe_open_context( async with maybe_open_context(
acm_func=open_price_feed, acm_func=open_price_feed,
kwargs={ kwargs={
'instrument': instrument.split('.')[0] 'instrument': instrument
}, },
key=f'{instrument.split('.')[0]}-price', key=f'{instrument}-price',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)
@ -791,10 +664,10 @@ async def maybe_open_order_feed(
async with maybe_open_context( async with maybe_open_context(
acm_func=open_order_feed, acm_func=open_order_feed,
kwargs={ kwargs={
'instrument': instrument.split('.')[0], 'instrument': instrument,
'fh': fh 'fh': fh
}, },
key=f'{instrument.split('.')[0]}-order', key=f'{instrument}-order',
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:
yield broadcast_receiver(feed, 10) yield broadcast_receiver(feed, 10)

View File

@ -21,33 +21,18 @@ Deribit backend.
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from typing import Any, Optional, Callable from typing import Any, Optional, Callable
from pprint import pformat
import time import time
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from pendulum import ( import pendulum
from_timestamp,
now,
)
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
import tractor import tractor
from piker.accounting import ( from piker.brokers import open_cached_client
MktPair,
unpack_fqme,
)
from piker.brokers import (
open_cached_client,
NoData,
)
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray from piker.data import ShmArray
from piker.data.validate import FeedInit
from piker.brokers._util import ( from piker.brokers._util import (
BrokerError, BrokerError,
DataUnavailable, DataUnavailable,
@ -62,13 +47,9 @@ from cryptofeed.symbols import Symbol
from .api import ( from .api import (
Client, Trade, Client, Trade,
get_config, get_config,
piker_sym_to_cb_sym, cb_sym_to_deribit_inst, str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst,
maybe_open_price_feed maybe_open_price_feed
) )
from .venues import (
Pair,
OptionPair,
)
_spawn_kwargs = { _spawn_kwargs = {
'infect_asyncio': True, 'infect_asyncio': True,
@ -83,119 +64,36 @@ async def open_history_client(
mkt: MktPair, mkt: MktPair,
) -> tuple[Callable, int]: ) -> tuple[Callable, int]:
fnstrument: str = mkt.bs_fqme
# TODO implement history getter for the new storage layer. # TODO implement history getter for the new storage layer.
async with open_cached_client('deribit') as client: async with open_cached_client('deribit') as client:
async def get_ohlc( async def get_ohlc(
timeframe: float, end_dt: Optional[datetime] = None,
end_dt: datetime | None = None, start_dt: Optional[datetime] = None,
start_dt: datetime | None = None,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
datetime, # start datetime, # start
datetime, # end datetime, # end
]: ]:
if timeframe != 60:
raise DataUnavailable('Only 1m bars are supported')
array: np.ndarray = await client.bars( array = await client.bars(
mkt, instrument,
start_dt=start_dt, start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
) )
if len(array) == 0: if len(array) == 0:
raise NoData( raise DataUnavailable
f'No frame for {start_dt} -> {end_dt}\n'
)
start_dt = from_timestamp(array[0]['time']) start_dt = pendulum.from_timestamp(array[0]['time'])
end_dt = from_timestamp(array[-1]['time']) end_dt = pendulum.from_timestamp(array[-1]['time'])
times = array['time']
if not times.any():
raise ValueError(
'Bad frame with null-times?\n\n'
f'{times}'
)
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
await tractor.pause()
return array, start_dt, end_dt return array, start_dt, end_dt
yield get_ohlc, {'erlangs': 3, 'rate': 3} yield get_ohlc, {'erlangs': 3, 'rate': 3}
@async_lifo_cache()
async def get_mkt_info(
fqme: str,
) -> tuple[MktPair, Pair] | None:
# uppercase since kraken bs_mktid is always upper
if 'deribit' not in fqme.lower():
fqme += '.deribit'
mkt_mode: str = ''
broker, mkt_ep, venue, expiry = unpack_fqme(fqme)
# NOTE: we always upper case all tokens to be consistent with
# binance's symbology style for pairs, like `BTCUSDT`, but in
# theory we could also just keep things lower case; as long as
# we're consistent and the symcache matches whatever this func
# returns, always!
expiry: str = expiry.upper()
venue: str = venue.upper()
venue_lower: str = venue.lower()
mkt_mode: str = 'option'
async with open_cached_client(
'deribit',
) as client:
assets: dict[str, Asset] = await client.get_assets()
pair_str: str = mkt_ep.lower()
# switch venue-mode depending on input pattern parsing
# since we want to use a particular endpoint (set) for
# pair info lookup!
client.mkt_mode = mkt_mode
pair: Pair = await client.exch_info(
sym=pair_str,
)
dst: Asset | None = assets.get(pair.bs_dst_asset)
if (
not dst
# TODO: a known asset DNE list?
# and pair.baseAsset == 'DEFI'
):
log.warning(
f'UNKNOWN {venue} asset {pair.base_currency} from,\n'
f'{pformat(pair.to_dict())}'
)
# XXX UNKNOWN missing "asset", though no idea why?
# maybe it's only avail in the margin venue(s): /dapi/ ?
return None
mkt = MktPair(
dst=dst,
src=assets.get(pair.bs_src_asset),
price_tick=pair.price_tick,
size_tick=pair.size_tick,
bs_mktid=pair.symbol,
expiry=expiry,
venue=venue,
broker='deribit',
)
return mkt, pair
async def stream_quotes( async def stream_quotes(
send_chan: trio.abc.SendChannel, send_chan: trio.abc.SendChannel,
@ -212,20 +110,25 @@ async def stream_quotes(
sym = symbols[0] sym = symbols[0]
init_msgs: list[FeedInit] = []
async with ( async with (
open_cached_client('deribit') as client, open_cached_client('deribit') as client,
send_chan as send_chan send_chan as send_chan
): ):
mkt, pair = await get_mkt_info(sym) init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': {
'asset_type': 'option',
'price_tick_size': 0.0005
},
'shm_write_opts': {'sum_tick_vml': False},
'fqsn': sym,
},
}
# build out init msgs according to latest spec nsym = piker_sym_to_cb_sym(sym)
init_msgs.append(
FeedInit(mkt_info=mkt)
)
nsym = piker_sym_to_cb_sym(sym.split('.')[0])
async with maybe_open_price_feed(sym) as stream: async with maybe_open_price_feed(sym) as stream:
@ -276,16 +179,7 @@ async def open_symbol_search(
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pattern: str
async for pattern in stream: async for pattern in stream:
# NOTE: pattern fuzzy-matching is done within # repack in dict form
# the methd impl. await stream.send(
pairs: dict[str, Pair] = await client.search_symbols( await client.search_symbols(pattern))
pattern,
)
# repack in fqme-keyed table
byfqme: dict[str, Pair] = {}
for pair in pairs.values():
byfqme[pair.bs_fqme] = pair
await stream.send(byfqme)

View File

@ -1,142 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Per market data-type definitions and schemas types.
"""
from __future__ import annotations
from typing import (
Literal,
)
from decimal import Decimal
from msgspec import field
from piker.types import Struct
# API endpoint paths by venue / sub-API
_domain: str = 'deribit.com'
_url = f'https://www.{_domain}'
# WEBsocketz
_ws_url: str = f'wss://www.{_domain}/ws/api/v2'
# test nets
_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2'
MarketType = Literal[
'option'
]
def get_api_eps(venue: MarketType) -> tuple[str, str]:
'''
Return API ep root paths per venue.
'''
return {
'option': (
_ws_url,
),
}[venue]
class Pair(Struct, frozen=True, kw_only=True):
symbol: str
# src
quote_currency: str # 'BTC'
# dst
base_currency: str # "BTC",
tick_size: float # 0.0001
tick_size_steps: list[dict[str, str | int | float]] # [{'above_price': 0.005, 'tick_size': 0.0005}]
@property
def price_tick(self) -> Decimal:
step_size: float = self.tick_size_steps[0].get('above_price')
return Decimal(step_size)
@property
def size_tick(self) -> Decimal:
step_size: float = self.tick_size_steps[0].get('tick_size')
return Decimal(step_size)
@property
def bs_fqme(self) -> str:
return self.symbol
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
class OptionPair(Pair, frozen=True, kw_only=True):
taker_commission: float # 0.0003
strike: float # 5000.0
settlement_period: str # 'day'
settlement_currency: str # "BTC",
rfq: bool # false
price_index: str # 'btc_usd'
option_type: str # 'call'
min_trade_amount: float # 0.1
maker_commission: float # 0.0003
kind: str # 'option'
is_active: bool # true
instrument_type: str # 'reversed'
instrument_name: str # 'BTC-1SEP24-55000-C'
instrument_id: int # 364671
expiration_timestamp: int # 1725177600000
creation_timestamp: int # 1724918461000
counter_currency: str # 'USD'
contract_size: float # '1.0'
block_trade_tick_size: float # '0.0001'
block_trade_min_trade_amount: int # '25'
block_trade_commission: float # '0.003'
# NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.deribit:OptionPair'
@property
def venue(self) -> str:
return 'OPTION'
@property
def bs_fqme(self) -> str:
return f'{self.symbol}.OPTION'
@property
def bs_src_asset(self) -> str:
return f'{self.quote_currency}'
@property
def bs_dst_asset(self) -> str:
return f'{self.base_currency}'
@property
def bs_mktid(self) -> str:
return f'{self.symbol}.{self.venue}'
PAIRTYPES: dict[MarketType, Pair] = {
'option': OptionPair,
}

View File

@ -111,10 +111,6 @@ class KucoinMktPair(Struct, frozen=True):
quoteMaxSize: float quoteMaxSize: float
quoteMinSize: float quoteMinSize: float
symbol: str # our bs_mktid, kucoin's internal id symbol: str # our bs_mktid, kucoin's internal id
feeCategory: int
makerFeeCoefficient: float
takerFeeCoefficient: float
st: bool
class AccountTrade(Struct, frozen=True): class AccountTrade(Struct, frozen=True):
@ -597,7 +593,7 @@ async def get_client() -> AsyncGenerator[Client, None]:
''' '''
async with ( async with (
httpx.AsyncClient( httpx.AsyncClient(
base_url='https://api.kucoin.com/api', base_url=f'https://api.kucoin.com/api',
) as trio_client, ) as trio_client,
): ):
client = Client(httpx_client=trio_client) client = Client(httpx_client=trio_client)
@ -641,7 +637,7 @@ async def open_ping_task(
await trio.sleep((ping_interval - 1000) / 1000) await trio.sleep((ping_interval - 1000) / 1000)
await ws.send_msg({'id': connect_id, 'type': 'ping'}) await ws.send_msg({'id': connect_id, 'type': 'ping'})
log.warning('Starting ping task for kucoin ws connection') log.info('Starting ping task for kucoin ws connection')
n.start_soon(ping_server) n.start_soon(ping_server)
yield yield
@ -653,14 +649,9 @@ async def open_ping_task(
async def get_mkt_info( async def get_mkt_info(
fqme: str, fqme: str,
) -> tuple[ ) -> tuple[MktPair, KucoinMktPair]:
MktPair,
KucoinMktPair,
]:
''' '''
Query for and return both a `piker.accounting.MktPair` and Query for and return a `MktPair` and `KucoinMktPair`.
`KucoinMktPair` from provided `fqme: str`
(fully-qualified-market-endpoint).
''' '''
async with open_cached_client('kucoin') as client: async with open_cached_client('kucoin') as client:
@ -735,8 +726,6 @@ async def stream_quotes(
log.info(f'Starting up quote stream(s) for {symbols}') log.info(f'Starting up quote stream(s) for {symbols}')
for sym_str in symbols: for sym_str in symbols:
mkt: MktPair
pair: KucoinMktPair
mkt, pair = await get_mkt_info(sym_str) mkt, pair = await get_mkt_info(sym_str)
init_msgs.append( init_msgs.append(
FeedInit(mkt_info=mkt) FeedInit(mkt_info=mkt)
@ -744,11 +733,7 @@ async def stream_quotes(
ws: NoBsWs ws: NoBsWs
token, ping_interval = await client._get_ws_token() token, ping_interval = await client._get_ws_token()
log.info('API reported ping_interval: {ping_interval}\n') connect_id = str(uuid4())
connect_id: str = str(uuid4())
typ: str
quote: dict
async with ( async with (
open_autorecon_ws( open_autorecon_ws(
( (
@ -762,37 +747,20 @@ async def stream_quotes(
), ),
) as ws, ) as ws,
open_ping_task(ws, ping_interval, connect_id), open_ping_task(ws, ping_interval, connect_id),
aclosing( aclosing(stream_messages(ws, sym_str)) as msg_gen,
iter_normed_quotes(
ws, sym_str
)
) as iter_quotes,
): ):
typ, quote = await anext(iter_quotes) typ, quote = await anext(msg_gen)
# take care to not unblock here until we get a real while typ != 'trade':
# trade quote? # take care to not unblock here until we get a real
# ^TODO, remove this right? # trade quote
# -[ ] what often blocks chart boot/new-feed switching typ, quote = await anext(msg_gen)
# since we'ere waiting for a live quote instead of just
# loading history afap..
# |_ XXX, not sure if we require a bit of rework to core
# feed init logic or if backends justg gotta be
# changed up.. feel like there was some causality
# dilema prolly only seen with IB too..
# while typ != 'trade':
# typ, quote = await anext(iter_quotes)
task_status.started((init_msgs, quote)) task_status.started((init_msgs, quote))
feed_is_live.set() feed_is_live.set()
# XXX NOTE, DO NOT include the `.<backend>` suffix! async for typ, msg in msg_gen:
# OW the sampling loop will not broadcast correctly.. await send_chan.send({sym_str: msg})
# since `bus._subscribers.setdefault(bs_fqme, set())`
# is used inside `.data.open_feed_bus()` !!!
topic: str = mkt.bs_fqme
async for typ, quote in iter_quotes:
await send_chan.send({topic: quote})
@acm @acm
@ -847,7 +815,7 @@ async def subscribe(
) )
async def iter_normed_quotes( async def stream_messages(
ws: NoBsWs, ws: NoBsWs,
sym: str, sym: str,
@ -878,9 +846,6 @@ async def iter_normed_quotes(
yield 'trade', { yield 'trade', {
'symbol': sym, 'symbol': sym,
# TODO, is 'last' even used elsewhere/a-good
# semantic? can't we just read the ticks with our
# .data.ticktools.frame_ticks()`/
'last': trade_data.price, 'last': trade_data.price,
'brokerd_ts': last_trade_ts, 'brokerd_ts': last_trade_ts,
'ticks': [ 'ticks': [
@ -973,7 +938,7 @@ async def open_history_client(
if end_dt is None: if end_dt is None:
inow = round(time.time()) inow = round(time.time())
log.debug( print(
f'difference in time between load and processing' f'difference in time between load and processing'
f'{inow - times[-1]}' f'{inow - times[-1]}'
) )

View File

@ -653,11 +653,7 @@ class Router(Struct):
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
first_quote: dict = flume.first_quote first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqme]: float = float(first_quote['last'])
if not (last := first_quote.get('last')):
last: float = flume.rt_shm.array[-1]['close']
book.lasts[fqme]: float = float(last)
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
@ -720,7 +716,7 @@ class Router(Struct):
subs = self.subscribers[sub_key] subs = self.subscribers[sub_key]
sent_some: bool = False sent_some: bool = False
for client_stream in subs.copy(): for client_stream in subs:
try: try:
await client_stream.send(msg) await client_stream.send(msg)
sent_some = True sent_some = True
@ -1014,14 +1010,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
if not status_msg.req: await router.client_broadcast(
# likely some order change state? status_msg.req.symbol,
await tractor.pause() status_msg,
else: )
await router.client_broadcast(
status_msg.req.symbol,
status_msg,
)
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')

View File

@ -653,7 +653,6 @@ async def open_trade_dialog(
# in) use manually constructed table from calling # in) use manually constructed table from calling
# the `.get_mkt_info()` provider EP above. # the `.get_mkt_info()` provider EP above.
_mktmap_table=mkt_by_fqme, _mktmap_table=mkt_by_fqme,
only_require=list(mkt_by_fqme),
) )
pp_msgs: list[BrokerdPosition] = [] pp_msgs: list[BrokerdPosition] = []

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_registry( tractor.get_arbiter(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -25,7 +25,6 @@ from collections import (
defaultdict, defaultdict,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial
import time import time
from typing import ( from typing import (
Any, Any,
@ -43,7 +42,7 @@ from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
import trio import trio
from trio import TaskStatus from trio_typing import TaskStatus
from .ticktools import ( from .ticktools import (
frame_ticks, frame_ticks,
@ -71,7 +70,6 @@ if TYPE_CHECKING:
_default_delay_s: float = 1.0 _default_delay_s: float = 1.0
# TODO: use new `tractor.singleton_acm` API for this!
class Sampler: class Sampler:
''' '''
Global sampling engine registry. Global sampling engine registry.
@ -81,9 +79,9 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample a (real-time) quote feeds, see time-step-sample (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below ``.service.maybe_open_samplerd()`` and the below
`register_with_sampler()`. ``register_with_sampler()``.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
@ -377,10 +375,7 @@ async def register_with_sampler(
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started( await ctx.started(set(Sampler.ohlcv_shms.keys()))
# XXX bc msgpack only allows one array type!
list(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -424,6 +419,7 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str | None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
@ -433,10 +429,7 @@ async def spawn_samplerd(
update and increment count write and stream broadcasting. update and increment count write and stream broadcasting.
''' '''
from piker.service import ( from piker.service import Services
get_service_mngr,
ServiceMngr,
)
dname = 'samplerd' dname = 'samplerd'
log.info(f'Spawning `{dname}`') log.info(f'Spawning `{dname}`')
@ -444,33 +437,26 @@ async def spawn_samplerd(
# singleton lock creation of ``samplerd`` since we only ever want # singleton lock creation of ``samplerd`` since we only ever want
# one daemon per ``pikerd`` proc tree. # one daemon per ``pikerd`` proc tree.
# TODO: make this built-into the service api? # TODO: make this built-into the service api?
mngr: ServiceMngr = get_service_mngr() async with Services.locks[dname + '_singleton']:
already_started: bool = dname in mngr.service_tasks
async with mngr._locks[dname + '_singleton']: if dname not in Services.service_tasks:
ctx: Context = await mngr.start_service(
daemon_name=dname, portal = await Services.actor_n.start_actor(
ctx_ep=partial( dname,
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs
)
await Services.start_service_task(
dname,
portal,
register_with_sampler, register_with_sampler,
period_s=1, period_s=1,
sub_for_broadcasts=False, sub_for_broadcasts=False,
),
debug_mode=mngr.debug_mode, # set by pikerd flag
# proxy-through to tractor
enable_modules=[
'piker.data._sampling',
],
loglevel=loglevel,
**extra_tractor_kwargs
)
if not already_started:
assert (
ctx
and
ctx.portal
and
not ctx.cancel_called
) )
return True return True
@ -903,7 +889,6 @@ async def uniform_rate_send(
# to consumers which crash or lose network connection. # to consumers which crash or lose network connection.
# I.e. we **DO NOT** want to crash and propagate up to # I.e. we **DO NOT** want to crash and propagate up to
# ``pikerd`` these kinds of errors! # ``pikerd`` these kinds of errors!
trio.EndOfChannel,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
ConnectionResetError, ConnectionResetError,

View File

@ -30,11 +30,7 @@ Actor runtime primtives and (distributed) service APIs for,
=> TODO: maybe to (re)move elsewhere? => TODO: maybe to (re)move elsewhere?
''' '''
from ._mngr import ( from ._mngr import Services as Services
get_service_mngr as get_service_mngr,
open_service_mngr as open_service_mngr,
ServiceMngr as ServiceMngr,
)
from ._registry import ( from ._registry import (
_tractor_kwargs as _tractor_kwargs, _tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr, _default_reg_addr as _default_reg_addr,

View File

@ -21,6 +21,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
from typing import ( from typing import (
Optional,
Any, Any,
ClassVar, ClassVar,
) )
@ -29,13 +30,13 @@ from contextlib import (
) )
import tractor import tractor
import trio
from ._util import ( from ._util import (
get_console_log, get_console_log,
) )
from ._mngr import ( from ._mngr import (
open_service_mngr, Services,
ServiceMngr,
) )
from ._registry import ( # noqa from ._registry import ( # noqa
_tractor_kwargs, _tractor_kwargs,
@ -58,7 +59,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [], registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [], enable_modules: list[str] = [],
loglevel: str|None = None, loglevel: Optional[str] = None,
# XXX NOTE XXX: you should pretty much never want debug mode # XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production. # for data daemons when running in production.
@ -68,7 +69,7 @@ async def open_piker_runtime(
# and spawn the service tree distributed per that. # and spawn the service tree distributed per that.
start_method: str = 'trio', start_method: str = 'trio',
tractor_runtime_overrides: dict|None = None, tractor_runtime_overrides: dict | None = None,
**tractor_kwargs, **tractor_kwargs,
) -> tuple[ ) -> tuple[
@ -118,10 +119,6 @@ async def open_piker_runtime(
# spawn other specialized daemons I think? # spawn other specialized daemons I think?
enable_modules=enable_modules, enable_modules=enable_modules,
# TODO: how to configure this?
# keep it on by default if debug mode is set?
# maybe_enable_greenback=debug_mode,
**tractor_kwargs, **tractor_kwargs,
) as actor, ) as actor,
@ -170,13 +167,12 @@ async def open_pikerd(
**kwargs, **kwargs,
) -> ServiceMngr: ) -> Services:
''' '''
Start a root piker daemon actor (aka `pikerd`) with an indefinite Start a root piker daemon with an indefinite lifetime.
lifetime.
A root actor-nursery is created which can be used to spawn and A root actor nursery is created which can be used to create and keep
supervise underling service sub-actors (see below). alive underling services (see below).
''' '''
# NOTE: for the root daemon we always enable the root # NOTE: for the root daemon we always enable the root
@ -203,6 +199,8 @@ async def open_pikerd(
root_actor, root_actor,
reg_addrs, reg_addrs,
), ),
tractor.open_nursery() as actor_nursery,
trio.open_nursery() as service_nursery,
): ):
for addr in reg_addrs: for addr in reg_addrs:
if addr not in root_actor.accept_addrs: if addr not in root_actor.accept_addrs:
@ -211,17 +209,25 @@ async def open_pikerd(
'Maybe you have another daemon already running?' 'Maybe you have another daemon already running?'
) )
mngr: ServiceMngr # assign globally for future daemon/task creation
async with open_service_mngr( Services.actor_n = actor_nursery
debug_mode=debug_mode, Services.service_n = service_nursery
) as mngr: Services.debug_mode = debug_mode
yield mngr
try:
yield Services
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in Services.service_tasks:
# await Services.cancel_service('samplerd')
service_nursery.cancel_scope.cancel()
# TODO: do we even need this? # TODO: do we even need this?
# @acm # @acm
# async def maybe_open_runtime( # async def maybe_open_runtime(
# loglevel: str|None = None, # loglevel: Optional[str] = None,
# **kwargs, # **kwargs,
# ) -> None: # ) -> None:
@ -250,7 +256,7 @@ async def maybe_open_pikerd(
loglevel: str | None = None, loglevel: str | None = None,
**kwargs, **kwargs,
) -> tractor._portal.Portal | ClassVar[ServiceMngr]: ) -> tractor._portal.Portal | ClassVar[Services]:
''' '''
If no ``pikerd`` daemon-root-actor can be found start it and If no ``pikerd`` daemon-root-actor can be found start it and
yield up (we should probably figure out returning a portal to self yield up (we should probably figure out returning a portal to self

View File

@ -49,7 +49,7 @@ from requests.exceptions import (
ReadTimeout, ReadTimeout,
) )
from ._mngr import ServiceMngr from ._mngr import Services
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -453,7 +453,7 @@ async def open_ahabd(
@acm @acm
async def start_ahab_service( async def start_ahab_service(
services: ServiceMngr, services: Services,
service_name: str, service_name: str,
# endpoint config passed as **kwargs # endpoint config passed as **kwargs
@ -549,8 +549,7 @@ async def start_ahab_service(
log.warning('Failed to cancel root permsed container') log.warning('Failed to cancel root permsed container')
except ( except (
# trio.MultiError, trio.MultiError,
ExceptionGroup,
) as err: ) as err:
for subexc in err.exceptions: for subexc in err.exceptions:
if isinstance(subexc, PermissionError): if isinstance(subexc, PermissionError):

View File

@ -26,17 +26,14 @@ from typing import (
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from collections import defaultdict
import tractor import tractor
import trio
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
) )
from ._mngr import ( from ._mngr import (
get_service_mngr, Services,
ServiceMngr,
) )
from ._actor_runtime import maybe_open_pikerd from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service from ._registry import find_service
@ -44,14 +41,15 @@ from ._registry import find_service
@acm @acm
async def maybe_spawn_daemon( async def maybe_spawn_daemon(
service_name: str, service_name: str,
service_task_target: Callable, service_task_target: Callable,
spawn_args: dict[str, Any], spawn_args: dict[str, Any],
loglevel: str | None = None, loglevel: str | None = None,
singleton: bool = False, singleton: bool = False,
_locks = defaultdict(trio.Lock),
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: ) -> tractor.Portal:
@ -69,7 +67,7 @@ async def maybe_spawn_daemon(
''' '''
# serialize access to this section to avoid # serialize access to this section to avoid
# 2 or more tasks racing to create a daemon # 2 or more tasks racing to create a daemon
lock = _locks[service_name] lock = Services.locks[service_name]
await lock.acquire() await lock.acquire()
async with find_service( async with find_service(
@ -134,65 +132,7 @@ async def maybe_spawn_daemon(
async with tractor.wait_for_actor(service_name) as portal: async with tractor.wait_for_actor(service_name) as portal:
lock.release() lock.release()
yield portal yield portal
# --- ---- --- await portal.cancel_actor()
# XXX NOTE XXX
# --- ---- ---
# DO NOT PUT A `portal.cancel_actor()` here (as was prior)!
#
# Doing so will cause an "out-of-band" ctxc
# (`tractor.ContextCancelled`) to be raised inside the
# `ServiceMngr.open_context_in_task()`'s call to
# `ctx.wait_for_result()` AND the internal self-ctxc
# "graceful capture" WILL NOT CATCH IT!
#
# This can cause certain types of operations to raise
# that ctxc BEFORE THEY `return`, resulting in
# a "false-negative" ctxc being raised when really
# nothing actually failed, other then our semantic
# "failure" to suppress an expected, graceful,
# self-cancel scenario..
#
# bUt wHy duZ It WorK lIKe dis..
# ------------------------------
# from the perspective of the `tractor.Context` this
# cancel request was conducted "out of band" since
# `Context.cancel()` was never called and thus the
# `._cancel_called: bool` was never set. Despite the
# remote `.canceller` being set to `pikerd` (i.e. the
# same `Actor.uid` of the raising service-mngr task) the
# service-task's ctx itself was never marked as having
# requested cancellation and thus still raises the ctxc
# bc it was unaware of any such request.
#
# How to make grokin these cases easier tho?
# ------------------------------------------
# Because `Portal.cancel_actor()` was called it requests
# "full-`Actor`-runtime-cancellation" of it's peer
# process which IS NOT THE SAME as a single inter-actor
# RPC task cancelling its local context with a remote
# peer `Task` in that same peer process.
#
# ?TODO? It might be better if we do one (or all) of the
# following:
#
# -[ ] at least set a special message for the
# `ContextCancelled` when raised locally by the
# unaware ctx task such that we check for the
# `.canceller` being *our `Actor`* and in the case
# where `Context._cancel_called == False` we specially
# note that this is likely an "out-of-band"
# runtime-cancel request triggered by some call to
# `Portal.cancel_actor()`, possibly even reporting the
# exact LOC of that caller by tracking it inside our
# portal-type?
# -[ ] possibly add another field `ContextCancelled` like
# maybe a,
# `.request_type: Literal['os', 'proc', 'actor',
# 'ctx']` type thing which would allow immediately
# being able to tell what kind of cancellation caused
# the unexpected ctxc?
# -[ ] REMOVE THIS COMMENT, once we've settled on how to
# better augment `tractor` to be more explicit on this!
async def spawn_emsd( async def spawn_emsd(
@ -207,22 +147,21 @@ async def spawn_emsd(
""" """
log.info('Spawning emsd') log.info('Spawning emsd')
smngr: ServiceMngr = get_service_mngr() portal = await Services.actor_n.start_actor(
portal = await smngr.actor_n.start_actor(
'emsd', 'emsd',
enable_modules=[ enable_modules=[
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
], ],
loglevel=loglevel, loglevel=loglevel,
debug_mode=smngr.debug_mode, # set by pikerd flag debug_mode=Services.debug_mode, # set by pikerd flag
**extra_tractor_kwargs **extra_tractor_kwargs
) )
# non-blocking setup of clearing service # non-blocking setup of clearing service
from ..clearing._ems import _setup_persistent_emsd from ..clearing._ems import _setup_persistent_emsd
await smngr.start_service_task( await Services.start_service_task(
'emsd', 'emsd',
portal, portal,

View File

@ -18,29 +18,16 @@
daemon-service management API. daemon-service management API.
""" """
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
# contextmanager as cm,
)
from collections import defaultdict from collections import defaultdict
from dataclasses import (
dataclass,
field,
)
import functools
import inspect
from typing import ( from typing import (
Callable, Callable,
Any, Any,
) )
import msgspec
import tractor
import trio import trio
from trio import TaskStatus from trio_typing import TaskStatus
import tractor
from tractor import ( from tractor import (
ActorNursery,
current_actor, current_actor,
ContextCancelled, ContextCancelled,
Context, Context,
@ -52,130 +39,6 @@ from ._util import (
) )
# TODO: implement a singleton deco-API for wrapping the below
# factory's impl for general actor-singleton use?
#
# @singleton
# async def open_service_mngr(
# **init_kwargs,
# ) -> ServiceMngr:
# '''
# Note this function body is invoke IFF no existing singleton instance already
# exists in this proc's memory.
# '''
# # setup
# yield ServiceMngr(**init_kwargs)
# # teardown
# TODO: singleton factory API instead of a class API
@acm
async def open_service_mngr(
*,
debug_mode: bool = False,
# impl deat which ensures a single global instance
_singleton: list[ServiceMngr|None] = [None],
**init_kwargs,
) -> ServiceMngr:
'''
Open a multi-subactor-as-service-daemon tree supervisor.
The delivered `ServiceMngr` is a singleton instance for each
actor-process and is allocated on first open and never
de-allocated unless explicitly deleted by al call to
`del_service_mngr()`.
'''
# TODO: factor this an allocation into
# a `._mngr.open_service_mngr()` and put in the
# once-n-only-once setup/`.__aenter__()` part!
# -[ ] how to make this only happen on the `mngr == None` case?
# |_ use `.trionics.maybe_open_context()` (for generic
# async-with-style-only-once of the factory impl, though
# what do we do for the allocation case?
# / `.maybe_open_nursery()` (since for this specific case
# it's simpler?) to activate
async with (
tractor.open_nursery() as an,
trio.open_nursery() as tn,
):
# impl specific obvi..
init_kwargs.update({
'actor_n': an,
'service_n': tn,
})
mngr: ServiceMngr|None
if (mngr := _singleton[0]) is None:
log.info('Allocating a new service mngr!')
mngr = _singleton[0] = ServiceMngr(**init_kwargs)
# TODO: put into `.__aenter__()` section of
# eventual `@singleton_acm` API wrapper.
#
# assign globally for future daemon/task creation
mngr.actor_n = an
mngr.service_n = tn
else:
assert (
mngr.actor_n
and
mngr.service_tn
)
log.info(
'Using extant service mngr!\n\n'
f'{mngr!r}\n' # it has a nice `.__repr__()` of services state
)
try:
# NOTE: this is a singleton factory impl specific detail
# which should be supported in the condensed
# `@singleton_acm` API?
mngr.debug_mode = debug_mode
yield mngr
finally:
# TODO: is this more clever/efficient?
# if 'samplerd' in mngr.service_tasks:
# await mngr.cancel_service('samplerd')
tn.cancel_scope.cancel()
def get_service_mngr() -> ServiceMngr:
'''
Try to get the singleton service-mngr for this actor presuming it
has already been allocated using,
.. code:: python
async with open_<@singleton_acm(func)>() as mngr`
... this block kept open ...
If not yet allocated raise a `ServiceError`.
'''
# https://stackoverflow.com/a/12627202
# https://docs.python.org/3/library/inspect.html#inspect.Signature
maybe_mngr: ServiceMngr|None = inspect.signature(
open_service_mngr
).parameters['_singleton'].default[0]
if maybe_mngr is None:
raise RuntimeError(
'Someone must allocate a `ServiceMngr` using\n\n'
'`async with open_service_mngr()` beforehand!!\n'
)
return maybe_mngr
# TODO: we need remote wrapping and a general soln: # TODO: we need remote wrapping and a general soln:
# - factor this into a ``tractor.highlevel`` extension # pack for the # - factor this into a ``tractor.highlevel`` extension # pack for the
# library. # library.
@ -183,46 +46,31 @@ def get_service_mngr() -> ServiceMngr:
# to the pikerd actor for starting services remotely! # to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns # - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion? # new actors and supervises them to completion?
@dataclass class Services:
class ServiceMngr:
# class ServiceMngr(msgspec.Struct):
'''
A multi-subactor-as-service manager.
Spawn, supervise and monitor service/daemon subactors in a SC actor_n: tractor._supervise.ActorNursery
process tree.
'''
actor_n: ActorNursery
service_n: trio.Nursery service_n: trio.Nursery
debug_mode: bool = False # tractor sub-actor debug mode flag debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[ service_tasks: dict[
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
Portal, Portal,
trio.Event, trio.Event,
] ]
] = field(default_factory=dict) ] = {}
locks = defaultdict(trio.Lock)
# internal per-service task mutexs
_locks = defaultdict(trio.Lock)
@classmethod
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
portal: Portal, portal: Portal,
# TODO: typevar for the return type of the target and then
# use it below for `ctx_res`?
target: Callable, target: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
**ctx_kwargs, **ctx_kwargs,
) -> (trio.CancelScope, Context, Any): ) -> (trio.CancelScope, Context):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` teardown.
@ -235,7 +83,6 @@ class ServiceMngr:
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Context,
trio.Event, trio.Event,
Any, Any,
] ]
@ -243,87 +90,64 @@ class ServiceMngr:
) -> Any: ) -> Any:
# TODO: use the ctx._scope directly here instead?
# -[ ] actually what semantics do we expect for this
# usage!?
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
try:
async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
) as (ctx, started): async with portal.open_context(
target,
allow_overruns=allow_overruns,
**ctx_kwargs,
# unblock once the remote context has started ) as (ctx, first):
complete = trio.Event()
task_status.started(( # unblock once the remote context has started
cs, complete = trio.Event()
ctx, task_status.started((cs, complete, first))
complete, log.info(
started, f'`pikerd` service {name} started with value {first}'
)) )
log.info( try:
f'`pikerd` service {name} started with value {started}'
)
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.wait_for_result() ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context
# function or by being cancelled here by the # function or by being cancelled here by the
# surrounding cancel scope. # surrounding cancel scope.
return ( return (await portal.result(), ctx_res)
await portal.wait_for_result(), except ContextCancelled as ctxe:
ctx_res, canceller: tuple[str, str] = ctxe.canceller
) our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.chan.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service `{name}` was remotely cancelled by a peer?\n'
# TODO: this would be a good spot to use
# a respawn feature Bo
f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n'
f'cancellee: {portal.chan.uid}\n' finally:
f'canceller: {canceller}\n' await portal.cancel_actor()
) complete.set()
else: self.service_tasks.pop(name)
raise
finally: cs, complete, first = await self.service_n.start(open_context_in_task)
# NOTE: the ctx MUST be cancelled first if we
# don't want the above `ctx.wait_for_result()` to
# raise a self-ctxc. WHY, well since from the ctx's
# perspective the cancel request will have
# arrived out-out-of-band at the `Actor.cancel()`
# level, thus `Context.cancel_called == False`,
# meaning `ctx._is_self_cancelled() == False`.
# with trio.CancelScope(shield=True):
# await ctx.cancel()
await portal.cancel_actor()
complete.set()
self.service_tasks.pop(name)
cs, sub_ctx, complete, started = await self.service_n.start(
open_context_in_task
)
# store the cancel scope and portal for later cancellation or # store the cancel scope and portal for later cancellation or
# retstart if needed. # retstart if needed.
self.service_tasks[name] = (cs, sub_ctx, portal, complete) self.service_tasks[name] = (cs, portal, complete)
return cs, sub_ctx, started
return cs, first
@classmethod
async def cancel_service( async def cancel_service(
self, self,
name: str, name: str,
@ -334,80 +158,8 @@ class ServiceMngr:
''' '''
log.info(f'Cancelling `pikerd` service {name}') log.info(f'Cancelling `pikerd` service {name}')
cs, sub_ctx, portal, complete = self.service_tasks[name] cs, portal, complete = self.service_tasks[name]
cs.cancel()
# cs.cancel()
await sub_ctx.cancel()
await complete.wait() await complete.wait()
assert name not in self.service_tasks, \
if name in self.service_tasks: f'Serice task for {name} not terminated?'
# TODO: custom err?
# raise ServiceError(
raise RuntimeError(
f'Serice task for {name} not terminated?'
)
# assert name not in self.service_tasks, \
# f'Serice task for {name} not terminated?'
async def start_service(
self,
daemon_name: str,
ctx_ep: Callable, # kwargs must `partial`-ed in!
debug_mode: bool = False,
**tractor_actor_kwargs,
) -> Context:
'''
Start a "service" task in a new sub-actor (daemon) and manage it's lifetime
indefinitely.
Services can be cancelled/shutdown using `.cancel_service()`.
'''
entry: tuple|None = self.service_tasks.get(daemon_name)
if entry:
(cs, sub_ctx, portal, complete) = entry
return sub_ctx
if daemon_name not in self.service_tasks:
portal = await self.actor_n.start_actor(
daemon_name,
debug_mode=( # maybe set globally during allocate
debug_mode
or
self.debug_mode
),
**tractor_actor_kwargs,
)
ctx_kwargs: dict[str, Any] = {}
if isinstance(ctx_ep, functools.partial):
ctx_kwargs: dict[str, Any] = ctx_ep.keywords
ctx_ep: Callable = ctx_ep.func
(cs, sub_ctx, started) = await self.start_service_task(
daemon_name,
portal,
ctx_ep,
**ctx_kwargs,
)
return sub_ctx
# TODO:
# -[ ] factor all the common shit from `.data._sampling`
# and `.brokers._daemon` into here / `ServiceMngr`
# in terms of allocating the `Portal` as part of the
# "service-in-subactor" starting!
# -[ ] move to `tractor.hilevel._service`, import and use here!
# NOTE: purposely leaks the ref to the mod-scope Bo
# import tractor
# from tractor.hilevel import (
# open_service_mngr,
# ServiceMngr,
# )
# mngr: ServiceMngr|None = None
# with tractor.hilevel.open_service_mngr() as mngr:
# Services = proxy(mngr)

View File

@ -21,13 +21,11 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
# TODO: oof, needs to be changed to `httpx`!
import asks import asks
if TYPE_CHECKING: if TYPE_CHECKING:
import docker import docker
from ._ahab import DockerContainer from ._ahab import DockerContainer
from . import ServiceMngr
from ._util import log # sub-sys logger from ._util import log # sub-sys logger
from ._util import ( from ._util import (
@ -129,7 +127,7 @@ def start_elasticsearch(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: ServiceMngr, service_mngr: Services,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -53,7 +53,7 @@ import pendulum
# import purerpc # import purerpc
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
from . import ServiceMngr from . import Services
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
get_console_log, get_console_log,
@ -233,7 +233,7 @@ def start_marketstore(
@acm @acm
async def start_ahab_daemon( async def start_ahab_daemon(
service_mngr: ServiceMngr, service_mngr: Services,
user_config: dict | None = None, user_config: dict | None = None,
loglevel: str | None = None, loglevel: str | None = None,

View File

@ -458,15 +458,13 @@ async def start_backfill(
'bf_until <- last_start_dt:\n' 'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n' f'{backfill_until_dt} <- {last_start_dt}\n'
) )
# UGH: what's a better way?
# TODO: backends are responsible for being correct on # ugh, what's a better way?
# this right!? # TODO: fwiw, we probably want a way to signal a throttle
# -[ ] in the `ib` case we could maybe offer some way # condition (eg. with ib) so that we can halt the
# to halt the request loop until the condition is # request loop until the condition is resolved?
# resolved or should the backend be entirely in if timeframe > 1:
# charge of solving such faults? yes, right? await tractor.pause()
# if timeframe > 1:
# await tractor.pause()
return return
assert ( assert (
@ -580,7 +578,6 @@ async def start_backfill(
'crypto', 'crypto',
'crypto_currency', 'crypto_currency',
'fiat', # a "forex pair" 'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}: }:
# for now, our table key schema is not including # for now, our table key schema is not including
# the dst[/src] source asset token. # the dst[/src] source asset token.

2326
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -50,8 +50,10 @@ attrs = "^23.1.0"
bidict = "^0.22.1" bidict = "^0.22.1"
colorama = "^0.4.6" colorama = "^0.4.6"
colorlog = "^6.7.0" colorlog = "^6.7.0"
cython = "^3.0.0"
greenback = "^1.1.1"
ib-insync = "^0.9.86" ib-insync = "^0.9.86"
msgspec = "^0.18.6" msgspec = "^0.18.0"
numba = "^0.59.0" numba = "^0.59.0"
numpy = "^1.25" numpy = "^1.25"
polars = "^0.18.13" polars = "^0.18.13"
@ -69,13 +71,11 @@ pdbp = "^1.5.0"
trio = "^0.24" trio = "^0.24"
pendulum = "^3.0.0" pendulum = "^3.0.0"
httpx = "^0.27.0" httpx = "^0.27.0"
cryptofeed = "^2.4.0"
pyarrow = "^17.0.0"
[tool.poetry.dependencies.tractor] [tool.poetry.dependencies.tractor]
develop = true develop = true
git = 'https://pikers.dev/goodboy/tractor.git' git = 'https://github.com/goodboy/tractor.git'
branch = 'aio_abandons' branch = 'asyncio_debugger_support'
# path = "../tractor" # path = "../tractor"
[tool.poetry.dependencies.asyncvnc] [tool.poetry.dependencies.asyncvnc]
@ -109,8 +109,6 @@ pytest = "^6.0.0"
elasticsearch = "^8.9.0" elasticsearch = "^8.9.0"
xonsh = "^0.14.2" xonsh = "^0.14.2"
prompt-toolkit = "3.0.40" prompt-toolkit = "3.0.40"
cython = "^3.0.0"
greenback = "^1.1.1"
# console ehancements and eventually remote debugging # console ehancements and eventually remote debugging
# extras/helpers. # extras/helpers.

View File

@ -10,7 +10,7 @@ from piker import (
config, config,
) )
from piker.service import ( from piker.service import (
get_service_mngr, Services,
) )
from piker.log import get_console_log from piker.log import get_console_log
@ -129,7 +129,7 @@ async def _open_test_pikerd(
) as service_manager, ) as service_manager,
): ):
# this proc/actor is the pikerd # this proc/actor is the pikerd
assert service_manager is get_service_mngr() assert service_manager is Services
async with tractor.wait_for_actor( async with tractor.wait_for_actor(
'pikerd', 'pikerd',

View File

@ -26,7 +26,7 @@ import pytest
import tractor import tractor
from uuid import uuid4 from uuid import uuid4
from piker.service import ServiceMngr from piker.service import Services
from piker.log import get_logger from piker.log import get_logger
from piker.clearing._messages import ( from piker.clearing._messages import (
Order, Order,
@ -158,7 +158,7 @@ def load_and_check_pos(
def test_ems_err_on_bad_broker( def test_ems_err_on_bad_broker(
open_test_pikerd: ServiceMngr, open_test_pikerd: Services,
loglevel: str, loglevel: str,
): ):
async def load_bad_fqme(): async def load_bad_fqme():

View File

@ -15,7 +15,7 @@ import tractor
from piker.service import ( from piker.service import (
find_service, find_service,
ServiceMngr, Services,
) )
from piker.data import ( from piker.data import (
open_feed, open_feed,
@ -44,7 +44,7 @@ def test_runtime_boot(
async def main(): async def main():
port = 6666 port = 6666
daemon_addr = ('127.0.0.1', port) daemon_addr = ('127.0.0.1', port)
services: ServiceMngr services: Services
async with ( async with (
open_test_pikerd( open_test_pikerd(