Compare commits
	
		
			1 Commits 
		
	
	
		
			c65929bfe7
			...
			e79554d0ac
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | e79554d0ac | 
							
								
								
									
										82
									
								
								default.nix
								
								
								
								
							
							
						
						
									
										82
									
								
								default.nix
								
								
								
								
							|  | @ -1,82 +0,0 @@ | |||
| with (import <nixpkgs> {}); | ||||
| with python312Packages; | ||||
| let | ||||
|   glibStorePath = lib.getLib glib; | ||||
|   qtpyStorePath = lib.getLib qtpy; | ||||
|   pyqt6StorePath = lib.getLib pyqt6; | ||||
|   pyqt6SipStorePath = lib.getLib pyqt6-sip; | ||||
|   qt6baseStorePath = lib.getLib qt6.qtbase; | ||||
|   rapidfuzzStorePath = lib.getLib rapidfuzz; | ||||
|   qdarkstyleStorePath = lib.getLib qdarkstyle; | ||||
| in | ||||
| stdenv.mkDerivation { | ||||
|   name = "piker-qt6-poetry-shell"; | ||||
|   buildInputs = [ | ||||
|     # System requirements. | ||||
|     glib | ||||
|     qt6.qtbase | ||||
|     libgcc.lib | ||||
| 
 | ||||
|     # Python requirements. | ||||
|     python312Full | ||||
|     poetry-core | ||||
|     qdarkstyle | ||||
|     rapidfuzz | ||||
|     pyqt6 | ||||
|     qtpy | ||||
|   ]; | ||||
|   src = null; | ||||
|   shellHook = '' | ||||
|     set -e | ||||
| 
 | ||||
|     export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${libgcc.lib}/lib:${glibStorePath}/lib | ||||
| 
 | ||||
|     # Set the Qt plugin path | ||||
|     # export QT_DEBUG_PLUGINS=1 | ||||
| 
 | ||||
|     QTBASE_PATH="${qt6baseStorePath}" | ||||
|     echo "qtbase path:    $QTBASE_PATH" | ||||
|     echo "" | ||||
|     export QT_PLUGIN_PATH="$QTBASE_PATH/lib/qt-6/plugins" | ||||
|     export QT_QPA_PLATFORM_PLUGIN_PATH="$QT_PLUGIN_PATH/platforms" | ||||
|     echo "qt plugin path: $QT_PLUGIN_PATH" | ||||
|     echo "" | ||||
| 
 | ||||
|     # Maybe create venv & install deps | ||||
|     poetry install --with uis | ||||
| 
 | ||||
|     # Use pyqt6 from System, patch activate script | ||||
|     ACTIVATE_SCRIPT_PATH="$(poetry env info --path)/bin/activate" | ||||
| 
 | ||||
|     export RPDFUZZ_PATH="${rapidfuzzStorePath}/lib/python3.12/site-packages" | ||||
|     export QDRKSTYLE_PATH="${qdarkstyleStorePath}/lib/python3.12/site-packages" | ||||
|     export QTPY_PATH="${qtpyStorePath}/lib/python3.12/site-packages" | ||||
|     export PYQT6_PATH="${pyqt6StorePath}/lib/python3.12/site-packages" | ||||
|     export PYQT6_SIP_PATH="${pyqt6SipStorePath}/lib/python3.12/site-packages" | ||||
|     echo "rapidfuzz at:   $RPDFUZZ_PATH" | ||||
|     echo "qdarkstyle at:  $QDRKSTYLE_PATH" | ||||
|     echo "qtpy at:        $QTPY_PATH"  | ||||
|     echo "pyqt6 at:       $PYQT6_PATH" | ||||
|     echo "pyqt6-sip at:   $PYQT6_SIP_PATH" | ||||
|     echo "" | ||||
| 
 | ||||
|     PATCH="export PYTHONPATH=\"" | ||||
| 
 | ||||
|     PATCH="$PATCH\$RPDFUZZ_PATH" | ||||
|     PATCH="$PATCH:\$QDRKSTYLE_PATH" | ||||
|     PATCH="$PATCH:\$QTPY_PATH" | ||||
|     PATCH="$PATCH:\$PYQT6_PATH" | ||||
|     PATCH="$PATCH:\$PYQT6_SIP_PATH" | ||||
| 
 | ||||
|     PATCH="$PATCH\"" | ||||
| 
 | ||||
|     if grep -q "$PATCH" "$ACTIVATE_SCRIPT_PATH"; then | ||||
|         echo "venv is already patched." | ||||
|     else | ||||
|         echo "patching $ACTIVATE_SCRIPT_PATH to use pyqt6 from nixos..." | ||||
|         sed -i "\$i$PATCH" $ACTIVATE_SCRIPT_PATH | ||||
|     fi | ||||
| 
 | ||||
|     poetry shell | ||||
|   ''; | ||||
| } | ||||
|  | @ -30,8 +30,7 @@ from types import ModuleType | |||
| from typing import ( | ||||
|     Any, | ||||
|     Iterator, | ||||
|     Generator, | ||||
|     TYPE_CHECKING, | ||||
|     Generator | ||||
| ) | ||||
| 
 | ||||
| import pendulum | ||||
|  | @ -60,10 +59,8 @@ from ..clearing._messages import ( | |||
|     BrokerdPosition, | ||||
| ) | ||||
| from piker.types import Struct | ||||
| from piker.log import get_logger | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     from piker.data._symcache import SymbologyCache | ||||
| from piker.data._symcache import SymbologyCache | ||||
| from ..log import get_logger | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
|  | @ -496,17 +493,6 @@ class Account(Struct): | |||
| 
 | ||||
|         _mktmap_table: dict[str, MktPair] | None = None, | ||||
| 
 | ||||
|         only_require: list[str]|True = True, | ||||
|         # ^list of fqmes that are "required" to be processed from | ||||
|         # this ledger pass; we often don't care about others and | ||||
|         # definitely shouldn't always error in such cases. | ||||
|         # (eg. broker backend loaded that doesn't yet supsport the | ||||
|         # symcache but also, inside the paper engine we don't ad-hoc | ||||
|         # request `get_mkt_info()` for every symbol in the ledger, | ||||
|         # only the one for which we're simulating against). | ||||
|         # TODO, not sure if there's a better soln for this, ideally | ||||
|         # all backends get symcache support afap i guess.. | ||||
| 
 | ||||
|     ) -> dict[str, Position]: | ||||
|         ''' | ||||
|         Update the internal `.pps[str, Position]` table from input | ||||
|  | @ -549,32 +535,11 @@ class Account(Struct): | |||
|                 if _mktmap_table is None: | ||||
|                     raise | ||||
| 
 | ||||
|                 required: bool = ( | ||||
|                     only_require is True | ||||
|                     or ( | ||||
|                         only_require is not True | ||||
|                         and | ||||
|                         fqme in only_require | ||||
|                     ) | ||||
|                 ) | ||||
|                 # XXX: caller is allowed to provide a fallback | ||||
|                 # mktmap table for the case where a new position is | ||||
|                 # being added and the preloaded symcache didn't | ||||
|                 # have this entry prior (eg. with frickin IB..) | ||||
|                 if ( | ||||
|                     not (mkt := _mktmap_table.get(fqme)) | ||||
|                     and | ||||
|                     required | ||||
|                 ): | ||||
|                     raise | ||||
| 
 | ||||
|                 elif not required: | ||||
|                     continue | ||||
| 
 | ||||
|                 else: | ||||
|                     # should be an entry retreived somewhere | ||||
|                     assert mkt | ||||
| 
 | ||||
|                 mkt = _mktmap_table[fqme] | ||||
| 
 | ||||
|             if not (pos := pps.get(bs_mktid)): | ||||
| 
 | ||||
|  | @ -691,7 +656,7 @@ class Account(Struct): | |||
|     def write_config(self) -> None: | ||||
|         ''' | ||||
|         Write the current account state to the user's account TOML file, normally | ||||
|         something like `pps.toml`. | ||||
|         something like ``pps.toml``. | ||||
| 
 | ||||
|         ''' | ||||
|         # TODO: show diff output? | ||||
|  |  | |||
|  | @ -51,7 +51,6 @@ __brokers__: list[str] = [ | |||
|     'ib', | ||||
|     'kraken', | ||||
|     'kucoin', | ||||
|     'deribit', | ||||
| 
 | ||||
|     # broken but used to work | ||||
|     # 'questrade', | ||||
|  | @ -62,6 +61,7 @@ __brokers__: list[str] = [ | |||
|     # wstrade | ||||
|     # iex | ||||
| 
 | ||||
|     # deribit | ||||
|     # bitso | ||||
| ] | ||||
| 
 | ||||
|  |  | |||
|  | @ -23,7 +23,6 @@ from __future__ import annotations | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from functools import partial | ||||
| from types import ModuleType | ||||
| from typing import ( | ||||
|     TYPE_CHECKING, | ||||
|  | @ -191,17 +190,14 @@ def broker_init( | |||
| 
 | ||||
| 
 | ||||
| async def spawn_brokerd( | ||||
| 
 | ||||
|     brokername: str, | ||||
|     loglevel: str | None = None, | ||||
| 
 | ||||
|     **tractor_kwargs, | ||||
| 
 | ||||
| ) -> bool: | ||||
|     ''' | ||||
|     Spawn a `brokerd.<backendname>` subactor service daemon | ||||
|     using `pikerd`'s service mngr. | ||||
| 
 | ||||
|     ''' | ||||
|     from piker.service._util import log  # use service mngr log | ||||
|     log.info(f'Spawning {brokername} broker daemon') | ||||
| 
 | ||||
|  | @ -221,35 +217,27 @@ async def spawn_brokerd( | |||
| 
 | ||||
|     # ask `pikerd` to spawn a new sub-actor and manage it under its | ||||
|     # actor nursery | ||||
|     from piker.service import ( | ||||
|         get_service_mngr, | ||||
|         ServiceMngr, | ||||
|     ) | ||||
|     dname: str = tractor_kwargs.pop('name')  # f'brokerd.{brokername}' | ||||
|     mngr: ServiceMngr = get_service_mngr() | ||||
|     ctx: tractor.Context = await mngr.start_service( | ||||
|         daemon_name=dname, | ||||
|         ctx_ep=partial( | ||||
|             # signature of target root-task endpoint | ||||
|             daemon_fixture_ep, | ||||
|     from piker.service import Services | ||||
| 
 | ||||
|             # passed to daemon_fixture_ep(**kwargs) | ||||
|             brokername=brokername, | ||||
|             loglevel=loglevel, | ||||
|         ), | ||||
|         debug_mode=mngr.debug_mode, | ||||
|         loglevel=loglevel, | ||||
|         enable_modules=( | ||||
|             _data_mods | ||||
|             + | ||||
|             tractor_kwargs.pop('enable_modules') | ||||
|         ), | ||||
|     dname: str = tractor_kwargs.pop('name')  # f'brokerd.{brokername}' | ||||
|     portal = await Services.actor_n.start_actor( | ||||
|         dname, | ||||
|         enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), | ||||
|         debug_mode=Services.debug_mode, | ||||
|         **tractor_kwargs | ||||
|     ) | ||||
|     assert ( | ||||
|         not ctx.cancel_called | ||||
|         and ctx.portal  # parent side | ||||
|         and dname in ctx.chan.uid  # subactor is named as desired | ||||
| 
 | ||||
|     # NOTE: the service mngr expects an already spawned actor + its | ||||
|     # portal ref in order to do non-blocking setup of brokerd | ||||
|     # service nursery. | ||||
|     await Services.start_service_task( | ||||
|         dname, | ||||
|         portal, | ||||
| 
 | ||||
|         # signature of target root-task endpoint | ||||
|         daemon_fixture_ep, | ||||
|         brokername=brokername, | ||||
|         loglevel=loglevel, | ||||
|     ) | ||||
|     return True | ||||
| 
 | ||||
|  | @ -274,7 +262,8 @@ async def maybe_spawn_brokerd( | |||
|     from piker.service import maybe_spawn_daemon | ||||
| 
 | ||||
|     async with maybe_spawn_daemon( | ||||
|         service_name=f'brokerd.{brokername}', | ||||
| 
 | ||||
|         f'brokerd.{brokername}', | ||||
|         service_task_target=spawn_brokerd, | ||||
|         spawn_args={ | ||||
|             'brokername': brokername, | ||||
|  |  | |||
|  | @ -567,7 +567,6 @@ class Client: | |||
|     ) -> str: | ||||
|         return { | ||||
|             'USDTM': 'usdtm_futes', | ||||
|             'SPOT': 'spot', | ||||
|             # 'COINM': 'coin_futes', | ||||
|             # ^-TODO-^ bc someone might want it..? | ||||
|         }[pair.venue] | ||||
|  |  | |||
|  | @ -181,6 +181,7 @@ class FutesPair(Pair): | |||
|     quoteAsset: str  # 'USDT', | ||||
|     quotePrecision: int  # 8, | ||||
|     requiredMarginPercent: float  # '5.0000', | ||||
|     settlePlan: int  # 0, | ||||
|     timeInForce: list[str]  # ['GTC', 'IOC', 'FOK', 'GTX'], | ||||
|     triggerProtect: float  # '0.0500', | ||||
|     underlyingSubType: list[str]  # ['PoW'], | ||||
|  |  | |||
|  | @ -25,7 +25,6 @@ from .api import ( | |||
|     get_client, | ||||
| ) | ||||
| from .feed import ( | ||||
|     get_mkt_info, | ||||
|     open_history_client, | ||||
|     open_symbol_search, | ||||
|     stream_quotes, | ||||
|  | @ -35,20 +34,15 @@ from .feed import ( | |||
|     # open_trade_dialog, | ||||
|     # norm_trade_records, | ||||
| # ) | ||||
| from .venues import ( | ||||
|     OptionPair, | ||||
| ) | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| __all__ = [ | ||||
|     'get_client', | ||||
| #    'trades_dialogue', | ||||
|     'get_mkt_info', | ||||
|     'open_history_client', | ||||
|     'open_symbol_search', | ||||
|     'stream_quotes', | ||||
|     'OptionPair', | ||||
| #    'norm_trade_records', | ||||
| ] | ||||
| 
 | ||||
|  |  | |||
|  | @ -19,14 +19,10 @@ Deribit backend. | |||
| 
 | ||||
| ''' | ||||
| import asyncio | ||||
| from collections import ChainMap | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from datetime import datetime | ||||
| from decimal import ( | ||||
|     Decimal, | ||||
| ) | ||||
| from functools import partial | ||||
| import time | ||||
| from typing import ( | ||||
|  | @ -35,7 +31,7 @@ from typing import ( | |||
|     Callable, | ||||
| ) | ||||
| 
 | ||||
| from pendulum import now | ||||
| import pendulum | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from rapidfuzz import process as fuzzy | ||||
|  | @ -55,18 +51,7 @@ from cryptofeed.defines import ( | |||
|     OPTION, CALL, PUT | ||||
| ) | ||||
| from cryptofeed.symbols import Symbol | ||||
| # types for managing the cb callbacks. | ||||
| # from cryptofeed.types import L1Book | ||||
| from .venues import ( | ||||
|     MarketType, | ||||
|     PAIRTYPES, | ||||
|     Pair, | ||||
|     OptionPair, | ||||
| ) | ||||
| from piker.accounting import ( | ||||
|     Asset, | ||||
|     MktPair, | ||||
| ) | ||||
| 
 | ||||
| from piker.data import ( | ||||
|     def_iohlcv_fields, | ||||
|     match_from_pairs, | ||||
|  | @ -95,19 +80,19 @@ _testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' | |||
| 
 | ||||
| 
 | ||||
| class JSONRPCResult(Struct): | ||||
|     jsonrpc: str = '2.0' | ||||
|     id: int | ||||
|     result: Optional[list[dict]] = None | ||||
|     error: Optional[dict] = None | ||||
|     usIn: int | ||||
|     usOut: int | ||||
|     usDiff: int | ||||
|     testnet: bool | ||||
|     jsonrpc: str = '2.0' | ||||
|     result: Optional[list[dict]] = None | ||||
|     error: Optional[dict] = None | ||||
| 
 | ||||
| class JSONRPCChannel(Struct): | ||||
|     jsonrpc: str = '2.0' | ||||
|     method: str | ||||
|     params: dict | ||||
|     jsonrpc: str = '2.0' | ||||
| 
 | ||||
| 
 | ||||
| class KLinesResult(Struct): | ||||
|  | @ -131,12 +116,9 @@ class Trade(Struct): | |||
|     instrument_name: str | ||||
|     index_price: float | ||||
|     direction: str | ||||
|     contracts: float | ||||
|     amount: float | ||||
|     combo_trade_id: Optional[int] = 0, | ||||
|     combo_id: Optional[str] = '', | ||||
|     block_trade_leg_count: Optional[int] = 0, | ||||
|     block_trade_id: Optional[str] = '', | ||||
|     amount: float | ||||
| 
 | ||||
| class LastTradesResult(Struct): | ||||
|     trades: list[Trade] | ||||
|  | @ -160,15 +142,13 @@ def str_to_cb_sym(name: str) -> Symbol: | |||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(expiry_date) | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base=base, | ||||
|         quote=quote, | ||||
|         base, quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=new_expiry_date) | ||||
|         expiry_date=expiry_date, | ||||
|         expiry_normalize=False) | ||||
| 
 | ||||
| 
 | ||||
| def piker_sym_to_cb_sym(name: str) -> Symbol: | ||||
|  | @ -179,103 +159,68 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: | |||
| 
 | ||||
|     if option_type == 'P': | ||||
|         option_type = PUT  | ||||
|     elif option_type == 'C': | ||||
|     elif option_type  == 'C': | ||||
|         option_type = CALL | ||||
|     else: | ||||
|         raise Exception("Couldn\'t parse option type") | ||||
| 
 | ||||
|     return Symbol( | ||||
|         base=base, | ||||
|         quote=quote, | ||||
|         base, quote, | ||||
|         type=OPTION, | ||||
|         strike_price=strike_price, | ||||
|         option_type=option_type, | ||||
|         expiry_date=expiry_date) | ||||
|         expiry_date=expiry_date.upper()) | ||||
| 
 | ||||
| 
 | ||||
| def cb_sym_to_deribit_inst(sym: Symbol): | ||||
|     new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) | ||||
|     otype = 'C' if sym.option_type == CALL else 'P' | ||||
|     # cryptofeed normalized | ||||
|     cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] | ||||
| 
 | ||||
|     return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' | ||||
|     # deribit specific  | ||||
|     months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] | ||||
| 
 | ||||
|     exp = sym.expiry_date | ||||
| 
 | ||||
| def get_values_from_cb_normalized_date(expiry_date: str) -> str: | ||||
|     # deribit specific | ||||
|     cb_norm = [ | ||||
|         'F', 'G', 'H', 'J', | ||||
|         'K', 'M', 'N', 'Q', | ||||
|         'U', 'V', 'X', 'Z' | ||||
|     ] | ||||
|     months = [ | ||||
|         'JAN', 'FEB', 'MAR', 'APR', | ||||
|         'MAY', 'JUN', 'JUL', 'AUG', | ||||
|         'SEP', 'OCT', 'NOV', 'DEC' | ||||
|     ] | ||||
|     # YYMDD | ||||
|     # 01234 | ||||
|     day, month, year = ( | ||||
|         expiry_date[3:], | ||||
|         months[cb_norm.index(expiry_date[2:3])], | ||||
|         expiry_date[:2] | ||||
|     ) | ||||
|     return f'{day}{month}{year}' | ||||
|     year, month, day = ( | ||||
|         exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) | ||||
| 
 | ||||
|     otype = 'C' if sym.option_type == CALL else 'P' | ||||
| 
 | ||||
|     return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' | ||||
| 
 | ||||
| 
 | ||||
| def get_config() -> dict[str, Any]: | ||||
| 
 | ||||
|     conf: dict | ||||
|     path: Path | ||||
|     conf, path = config.load() | ||||
| 
 | ||||
|     conf, path = config.load( | ||||
|         conf_name='brokers', | ||||
|         touch_if_dne=True, | ||||
|     ) | ||||
|     section: dict = {} | ||||
|     section = conf.get('deribit') | ||||
| 
 | ||||
|     section['log'] = {} | ||||
|     section['log']['filename'] = 'feedhandler.log' | ||||
|     section['log']['level'] = 'DEBUG' | ||||
|     section['log']['disabled'] = True | ||||
|     # TODO: document why we send this, basically because logging params for cryptofeed | ||||
|     conf['log'] = {} | ||||
|     conf['log']['disabled'] = True | ||||
| 
 | ||||
|     if section is None: | ||||
|         log.warning(f'No config section found for deribit in {path}') | ||||
|         return {} | ||||
| 
 | ||||
|     return section | ||||
| 
 | ||||
| def get_fh_config() -> dict[str, Any]: | ||||
|     conf_option = get_config().get('option', {}) | ||||
|     conf_log = get_config().get('log', {}) | ||||
| 
 | ||||
|     return { | ||||
|         'log': { | ||||
|             'filename': conf_log.get('filename'), | ||||
|             'level': conf_log.get('level'), | ||||
|             'disabled': conf_log.get('disabled') | ||||
|         }, | ||||
|         'deribit': { | ||||
|             'key_id': conf_option.get('api_key'), | ||||
|             'key_secret': conf_option.get('api_secret') | ||||
|         } | ||||
|     } | ||||
|     return conf  | ||||
| 
 | ||||
| 
 | ||||
| class Client: | ||||
| 
 | ||||
|     def __init__( | ||||
|         self, | ||||
|     def __init__(self, json_rpc: Callable) -> None: | ||||
|         self._pairs: dict[str, Any] = None | ||||
| 
 | ||||
|         json_rpc: Callable | ||||
|         config = get_config().get('deribit', {}) | ||||
| 
 | ||||
|     ) -> None: | ||||
|         self._pairs: ChainMap[str, Pair] = ChainMap() | ||||
|         if ('key_id' in config) and ('key_secret' in config): | ||||
|             self._key_id = config['key_id'] | ||||
|             self._key_secret = config['key_secret'] | ||||
| 
 | ||||
|         config = get_config().get('option', {}) | ||||
| 
 | ||||
|         self._key_id = config.get('api_key') | ||||
|         self._key_secret = config.get('api_secret') | ||||
|         else: | ||||
|             self._key_id = None | ||||
|             self._key_secret = None | ||||
| 
 | ||||
|         self.json_rpc = json_rpc | ||||
| 
 | ||||
|  | @ -283,10 +228,7 @@ class Client: | |||
|     def currencies(self): | ||||
|         return ['btc', 'eth', 'sol', 'usd'] | ||||
| 
 | ||||
|     async def get_balances( | ||||
|         self, | ||||
|         kind: str = 'option' | ||||
|     ) -> dict[str, float]: | ||||
|     async def get_balances(self, kind: str = 'option') -> dict[str, float]: | ||||
|         """Return the set of positions for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|  | @ -302,39 +244,20 @@ class Client: | |||
| 
 | ||||
|         return balances | ||||
| 
 | ||||
|     async def get_assets( | ||||
|         self, | ||||
|         venue: str | None = None, | ||||
| 
 | ||||
|     ) -> dict[str, Asset]: | ||||
|     async def get_assets(self) -> dict[str, float]: | ||||
|         """Return the set of asset balances for this account | ||||
|         by symbol. | ||||
|         """ | ||||
|         assets = {} | ||||
|         resp = await self.json_rpc( | ||||
|             'private/get_account_summaries', | ||||
|             params={ | ||||
|                 'extended' : True | ||||
|             } | ||||
|         ) | ||||
|         summaries = resp.result['summaries'] | ||||
|         for summary in summaries: | ||||
|             currency = summary['currency'] | ||||
|             tx_tick = Decimal('1e-08') | ||||
|             atype='crypto_currency' | ||||
|             assets[currency] = Asset( | ||||
|                 name=currency, | ||||
|                 atype=atype, | ||||
|                 tx_tick=tx_tick) | ||||
|         return assets  | ||||
|         balances = {} | ||||
| 
 | ||||
|     async def get_mkt_pairs(self) -> dict[str, Pair]: | ||||
|         flat: dict[str, Pair] = {} | ||||
|         for key in self._pairs: | ||||
|             item = self._pairs.get(key) | ||||
|             flat[item.bs_fqme] = item | ||||
|         for currency in self.currencies: | ||||
|             resp = await self.json_rpc( | ||||
|                 'private/get_account_summary', params={ | ||||
|                     'currency': currency.upper()}) | ||||
| 
 | ||||
|         return flat | ||||
|             balances[currency] = resp.result['balance'] | ||||
| 
 | ||||
|         return balances | ||||
| 
 | ||||
|     async def submit_limit( | ||||
|         self, | ||||
|  | @ -363,28 +286,6 @@ class Client: | |||
|             'private/cancel', {'order_id': oid}) | ||||
|         return resp.result | ||||
| 
 | ||||
|     async def exch_info( | ||||
|         self, | ||||
|         sym: str | None = None, | ||||
| 
 | ||||
|         venue: MarketType | None = None, | ||||
|         expiry: str | None = None, | ||||
| 
 | ||||
|     ) -> dict[str, Pair] | Pair: | ||||
| 
 | ||||
|         pair_table: dict[str, Pair] = self._pairs | ||||
| 
 | ||||
|         if ( | ||||
|             sym | ||||
|             and (cached_pair := pair_table.get(sym)) | ||||
|         ): | ||||
|             return cached_pair | ||||
| 
 | ||||
|         if sym: | ||||
|             return pair_table[sym.lower()] | ||||
|         else: | ||||
|             return self._pairs | ||||
| 
 | ||||
|     async def symbol_info( | ||||
|         self, | ||||
|         instrument: Optional[str] = None, | ||||
|  | @ -392,7 +293,7 @@ class Client: | |||
|         kind: str = 'option', | ||||
|         expired: bool = False | ||||
| 
 | ||||
|     ) -> dict[str, Pair] | Pair: | ||||
|     ) -> dict[str, dict]: | ||||
|         ''' | ||||
|         Get symbol infos. | ||||
| 
 | ||||
|  | @ -412,29 +313,14 @@ class Client: | |||
|             params, | ||||
|         ) | ||||
|         # convert to symbol-keyed table | ||||
|         pair_type: Type = PAIRTYPES[kind] | ||||
|         results: list[dict] | None = resp.result | ||||
|          | ||||
|         instruments: dict[str, Pair] = {} | ||||
|         for item in results: | ||||
|             symbol=item['instrument_name'].lower() | ||||
|             try: | ||||
|                 pair: Pair = pair_type( | ||||
|                     symbol=symbol, | ||||
|                     **item | ||||
|                 ) | ||||
|             except Exception as e: | ||||
|                 e.add_note( | ||||
|                     "\nDon't panic, prolly stupid deribit changed their symbology schema again..\n" | ||||
|                     'Check out their API docs here:\n\n' | ||||
|                     'https://docs.deribit.com/?python#deribit-api-v2-1-1' | ||||
|                 ) | ||||
|                 raise | ||||
| 
 | ||||
|             instruments[symbol] = pair | ||||
|         instruments: dict[str, dict] = { | ||||
|             item['instrument_name'].lower(): item | ||||
|             for item in results | ||||
|         } | ||||
| 
 | ||||
|         if instrument is not None: | ||||
|             return instruments[instrument.lower()] | ||||
|             return instruments[instrument] | ||||
|         else: | ||||
|             return instruments | ||||
| 
 | ||||
|  | @ -451,12 +337,12 @@ class Client: | |||
|         self, | ||||
|         pattern: str, | ||||
|         limit: int = 30, | ||||
|     ) -> dict[str, Pair]: | ||||
|     ) -> dict[str, Any]: | ||||
|         ''' | ||||
|         Fuzzy search symbology set for pairs matching `pattern`. | ||||
| 
 | ||||
|         ''' | ||||
|         pairs: dict[str, Pair] = await self.symbol_info() | ||||
|         pairs: dict[str, Any] = await self.symbol_info() | ||||
|         matches: dict[str, Pair] = match_from_pairs( | ||||
|             pairs=pairs, | ||||
|             query=pattern.upper(), | ||||
|  | @ -472,19 +358,16 @@ class Client: | |||
| 
 | ||||
|     async def bars( | ||||
|         self, | ||||
|         mkt: MktPair, | ||||
| 
 | ||||
|         symbol: str, | ||||
|         start_dt: Optional[datetime] = None, | ||||
|         end_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|         limit: int = 1000, | ||||
|         as_np: bool = True, | ||||
| 
 | ||||
|     ) -> list[tuple] | np.ndarray: | ||||
|         instrument: str = mkt.bs_fqme | ||||
|     ) -> dict: | ||||
|         instrument = symbol | ||||
| 
 | ||||
|         if end_dt is None: | ||||
|             end_dt = now('UTC') | ||||
|             end_dt = pendulum.now('UTC') | ||||
| 
 | ||||
|         if start_dt is None: | ||||
|             start_dt = end_dt.start_of( | ||||
|  | @ -504,27 +387,29 @@ class Client: | |||
|             }) | ||||
| 
 | ||||
|         result = KLinesResult(**resp.result) | ||||
|         new_bars: list[tuple] = [] | ||||
|         new_bars = [] | ||||
|         for i in range(len(result.close)): | ||||
| 
 | ||||
|             row = [  | ||||
|             _open = result.open[i] | ||||
|             high = result.high[i] | ||||
|             low = result.low[i] | ||||
|             close = result.close[i] | ||||
|             volume = result.volume[i] | ||||
| 
 | ||||
|             row = [ | ||||
|                 (start_time + (i * (60 * 1000))) / 1000.0,  # time | ||||
|                 result.open[i], | ||||
|                 result.high[i], | ||||
|                 result.low[i], | ||||
|                 result.close[i], | ||||
|                 result.volume[i] | ||||
|                 result.volume[i], | ||||
|                 0 | ||||
|             ] | ||||
| 
 | ||||
|             new_bars.append((i,) + tuple(row)) | ||||
| 
 | ||||
|         if not as_np: | ||||
|             return result | ||||
| 
 | ||||
|         return np.array( | ||||
|             new_bars, | ||||
|             dtype=def_iohlcv_fields | ||||
|         ) | ||||
|         array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines | ||||
|         return array | ||||
| 
 | ||||
|     async def last_trades( | ||||
|         self, | ||||
|  | @ -549,10 +434,10 @@ async def get_client( | |||
|     async with ( | ||||
|         trio.open_nursery() as n, | ||||
|         open_jsonrpc_session( | ||||
|             _ws_url, response_type=JSONRPCResult | ||||
|         ) as json_rpc | ||||
|             _testnet_ws_url, dtype=JSONRPCResult) as json_rpc | ||||
|     ): | ||||
|         client = Client(json_rpc) | ||||
| 
 | ||||
|         _refresh_token: Optional[str] = None | ||||
|         _access_token: Optional[str] = None | ||||
| 
 | ||||
|  | @ -622,7 +507,7 @@ async def get_client( | |||
| 
 | ||||
| @acm | ||||
| async def open_feed_handler(): | ||||
|     fh = FeedHandler(config=get_fh_config()) | ||||
|     fh = FeedHandler(config=get_config()) | ||||
|     yield fh | ||||
|     await to_asyncio.run_task(fh.stop_async) | ||||
| 
 | ||||
|  | @ -638,7 +523,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: | |||
| 
 | ||||
| async def aio_price_feed_relay( | ||||
|     fh: FeedHandler, | ||||
|     instrument: str, | ||||
|     instrument: Symbol, | ||||
|     from_trio: asyncio.Queue, | ||||
|     to_trio: trio.abc.SendChannel, | ||||
| ) -> None: | ||||
|  | @ -657,33 +542,21 @@ async def aio_price_feed_relay( | |||
|             'symbol': cb_sym_to_deribit_inst( | ||||
|                 str_to_cb_sym(data.symbol)).lower(), | ||||
|             'ticks': [ | ||||
|                 { | ||||
|                     'type': 'bid', | ||||
|                     'price': float(data.bid_price), | ||||
|                     'size': float(data.bid_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'bsize', | ||||
|                     'price': float(data.bid_price), | ||||
|                     'size': float(data.bid_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'ask', | ||||
|                     'price': float(data.ask_price), | ||||
|                     'size': float(data.ask_size) | ||||
|                 }, | ||||
|                 { | ||||
|                     'type': 'asize', | ||||
|                     'price': float(data.ask_price), | ||||
|                     'size': float(data.ask_size) | ||||
|                 } | ||||
|                 {'type': 'bid', | ||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                 {'type': 'bsize', | ||||
|                     'price': float(data.bid_price), 'size': float(data.bid_size)}, | ||||
|                 {'type': 'ask', | ||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)}, | ||||
|                 {'type': 'asize', | ||||
|                     'price': float(data.ask_price), 'size': float(data.ask_size)} | ||||
|             ] | ||||
|         })) | ||||
|     sym: Symbol = piker_sym_to_cb_sym(instrument) | ||||
| 
 | ||||
|     fh.add_feed( | ||||
|         DERIBIT, | ||||
|         channels=[TRADES, L1_BOOK], | ||||
|         symbols=[sym], | ||||
|         symbols=[piker_sym_to_cb_sym(instrument)], | ||||
|         callbacks={ | ||||
|             TRADES: _trade, | ||||
|             L1_BOOK: _l1 | ||||
|  | @ -724,9 +597,9 @@ async def maybe_open_price_feed( | |||
|     async with maybe_open_context( | ||||
|         acm_func=open_price_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument.split('.')[0] | ||||
|             'instrument': instrument | ||||
|         }, | ||||
|         key=f'{instrument.split('.')[0]}-price', | ||||
|         key=f'{instrument}-price', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|  | @ -791,10 +664,10 @@ async def maybe_open_order_feed( | |||
|     async with maybe_open_context( | ||||
|         acm_func=open_order_feed, | ||||
|         kwargs={ | ||||
|             'instrument': instrument.split('.')[0], | ||||
|             'instrument': instrument, | ||||
|             'fh': fh | ||||
|         }, | ||||
|         key=f'{instrument.split('.')[0]}-order', | ||||
|         key=f'{instrument}-order', | ||||
|     ) as (cache_hit, feed): | ||||
|         if cache_hit: | ||||
|             yield broadcast_receiver(feed, 10) | ||||
|  |  | |||
|  | @ -21,33 +21,18 @@ Deribit backend. | |||
| from contextlib import asynccontextmanager as acm | ||||
| from datetime import datetime | ||||
| from typing import Any, Optional, Callable | ||||
| from pprint import pformat | ||||
| import time | ||||
| 
 | ||||
| import trio | ||||
| from trio_typing import TaskStatus | ||||
| from pendulum import ( | ||||
|     from_timestamp, | ||||
|     now, | ||||
| ) | ||||
| import pendulum | ||||
| from rapidfuzz import process as fuzzy | ||||
| import numpy as np | ||||
| import tractor | ||||
| 
 | ||||
| from piker.accounting import ( | ||||
|     MktPair, | ||||
|     unpack_fqme, | ||||
| ) | ||||
| from piker.brokers import ( | ||||
|     open_cached_client, | ||||
|     NoData, | ||||
| ) | ||||
| from piker._cacheables import ( | ||||
|     async_lifo_cache, | ||||
| ) | ||||
| from piker.brokers import open_cached_client | ||||
| from piker.log import get_logger, get_console_log | ||||
| from piker.data import ShmArray | ||||
| from piker.data.validate import FeedInit | ||||
| from piker.brokers._util import ( | ||||
|     BrokerError, | ||||
|     DataUnavailable, | ||||
|  | @ -62,13 +47,9 @@ from cryptofeed.symbols import Symbol | |||
| from .api import ( | ||||
|     Client, Trade, | ||||
|     get_config, | ||||
|     piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, | ||||
|     maybe_open_price_feed | ||||
| ) | ||||
| from .venues import ( | ||||
|     Pair, | ||||
|     OptionPair, | ||||
| ) | ||||
| 
 | ||||
| _spawn_kwargs = { | ||||
|     'infect_asyncio': True, | ||||
|  | @ -83,119 +64,36 @@ async def open_history_client( | |||
|     mkt: MktPair, | ||||
| ) -> tuple[Callable, int]: | ||||
| 
 | ||||
|     fnstrument: str = mkt.bs_fqme | ||||
|     # TODO implement history getter for the new storage layer. | ||||
|     async with open_cached_client('deribit') as client: | ||||
| 
 | ||||
|         async def get_ohlc( | ||||
|             timeframe: float, | ||||
|             end_dt: datetime | None = None, | ||||
|             start_dt: datetime | None = None, | ||||
|             end_dt: Optional[datetime] = None, | ||||
|             start_dt: Optional[datetime] = None, | ||||
| 
 | ||||
|         ) -> tuple[ | ||||
|             np.ndarray, | ||||
|             datetime,  # start | ||||
|             datetime,  # end | ||||
|         ]: | ||||
|             if timeframe != 60: | ||||
|                 raise DataUnavailable('Only 1m bars are supported') | ||||
| 
 | ||||
|             array: np.ndarray = await client.bars( | ||||
|                 mkt, | ||||
|             array = await client.bars( | ||||
|                 instrument, | ||||
|                 start_dt=start_dt, | ||||
|                 end_dt=end_dt, | ||||
|             ) | ||||
|             if len(array) == 0: | ||||
|                 raise NoData( | ||||
|                     f'No frame for {start_dt} -> {end_dt}\n' | ||||
|                 ) | ||||
|                 raise DataUnavailable | ||||
| 
 | ||||
|             start_dt = from_timestamp(array[0]['time']) | ||||
|             end_dt = from_timestamp(array[-1]['time']) | ||||
| 
 | ||||
|             times = array['time'] | ||||
|             if not times.any(): | ||||
|                 raise ValueError( | ||||
|                     'Bad frame with null-times?\n\n' | ||||
|                     f'{times}' | ||||
|                 ) | ||||
| 
 | ||||
|             if end_dt is None: | ||||
|                 inow: int = round(time.time()) | ||||
|                 if (inow - times[-1]) > 60: | ||||
|                     await tractor.pause() | ||||
|             start_dt = pendulum.from_timestamp(array[0]['time']) | ||||
|             end_dt = pendulum.from_timestamp(array[-1]['time']) | ||||
| 
 | ||||
|             return array, start_dt, end_dt | ||||
| 
 | ||||
|         yield get_ohlc, {'erlangs': 3, 'rate': 3} | ||||
| 
 | ||||
| 
 | ||||
| @async_lifo_cache() | ||||
| async def get_mkt_info( | ||||
|     fqme: str, | ||||
| 
 | ||||
| ) -> tuple[MktPair, Pair] | None: | ||||
| 
 | ||||
|     # uppercase since kraken bs_mktid is always upper | ||||
|     if 'deribit' not in fqme.lower(): | ||||
|         fqme += '.deribit' | ||||
| 
 | ||||
|     mkt_mode: str = '' | ||||
|     broker, mkt_ep, venue, expiry = unpack_fqme(fqme) | ||||
| 
 | ||||
|     # NOTE: we always upper case all tokens to be consistent with | ||||
|     # binance's symbology style for pairs, like `BTCUSDT`, but in | ||||
|     # theory we could also just keep things lower case; as long as | ||||
|     # we're consistent and the symcache matches whatever this func | ||||
|     # returns, always! | ||||
|     expiry: str = expiry.upper() | ||||
|     venue: str = venue.upper() | ||||
|     venue_lower: str = venue.lower() | ||||
| 
 | ||||
|     mkt_mode: str = 'option' | ||||
| 
 | ||||
|     async with open_cached_client( | ||||
|         'deribit', | ||||
|     ) as client: | ||||
| 
 | ||||
|         assets: dict[str, Asset] = await client.get_assets() | ||||
|         pair_str: str = mkt_ep.lower() | ||||
| 
 | ||||
|         # switch venue-mode depending on input pattern parsing | ||||
|         # since we want to use a particular endpoint (set) for | ||||
|         # pair info lookup! | ||||
|         client.mkt_mode = mkt_mode | ||||
| 
 | ||||
|         pair: Pair = await client.exch_info( | ||||
|             sym=pair_str, | ||||
|         ) | ||||
|         dst: Asset | None = assets.get(pair.bs_dst_asset) | ||||
|         if ( | ||||
|             not dst | ||||
|             # TODO: a known asset DNE list? | ||||
|             # and pair.baseAsset == 'DEFI' | ||||
|         ): | ||||
|             log.warning( | ||||
|                 f'UNKNOWN {venue} asset {pair.base_currency} from,\n' | ||||
|                 f'{pformat(pair.to_dict())}' | ||||
|             ) | ||||
| 
 | ||||
|             # XXX UNKNOWN missing "asset", though no idea why? | ||||
|             # maybe it's only avail in the margin venue(s): /dapi/ ? | ||||
|             return None | ||||
| 
 | ||||
|         mkt = MktPair( | ||||
|             dst=dst, | ||||
|             src=assets.get(pair.bs_src_asset), | ||||
|             price_tick=pair.price_tick, | ||||
|             size_tick=pair.size_tick, | ||||
|             bs_mktid=pair.symbol, | ||||
|             expiry=expiry, | ||||
|             venue=venue, | ||||
|             broker='deribit', | ||||
|         ) | ||||
|         return mkt, pair | ||||
| 
 | ||||
| 
 | ||||
| async def stream_quotes( | ||||
| 
 | ||||
|     send_chan: trio.abc.SendChannel, | ||||
|  | @ -212,20 +110,25 @@ async def stream_quotes( | |||
| 
 | ||||
|     sym = symbols[0] | ||||
| 
 | ||||
|     init_msgs: list[FeedInit] = [] | ||||
| 
 | ||||
|     async with ( | ||||
|         open_cached_client('deribit') as client, | ||||
|         send_chan as send_chan | ||||
|     ): | ||||
| 
 | ||||
|         mkt, pair = await get_mkt_info(sym) | ||||
|         init_msgs = { | ||||
|             # pass back token, and bool, signalling if we're the writer | ||||
|             # and that history has been written | ||||
|             sym: { | ||||
|                 'symbol_info': { | ||||
|                     'asset_type': 'option', | ||||
|                     'price_tick_size': 0.0005 | ||||
|                 }, | ||||
|                 'shm_write_opts': {'sum_tick_vml': False}, | ||||
|                 'fqsn': sym, | ||||
|             }, | ||||
|         } | ||||
| 
 | ||||
|         # build out init msgs according to latest spec | ||||
|         init_msgs.append( | ||||
|             FeedInit(mkt_info=mkt) | ||||
|         ) | ||||
|         nsym = piker_sym_to_cb_sym(sym.split('.')[0]) | ||||
|         nsym = piker_sym_to_cb_sym(sym) | ||||
| 
 | ||||
|         async with maybe_open_price_feed(sym) as stream: | ||||
| 
 | ||||
|  | @ -276,16 +179,7 @@ async def open_symbol_search( | |||
| 
 | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             pattern: str | ||||
|             async for pattern in stream: | ||||
|                 # NOTE: pattern fuzzy-matching is done within | ||||
|                 # the methd impl. | ||||
|                 pairs: dict[str, Pair] = await client.search_symbols( | ||||
|                     pattern, | ||||
|                 ) | ||||
|                 # repack in fqme-keyed table | ||||
|                 byfqme: dict[str, Pair] = {} | ||||
|                 for pair in pairs.values(): | ||||
|                     byfqme[pair.bs_fqme] = pair | ||||
| 
 | ||||
|                 await stream.send(byfqme) | ||||
|                 # repack in dict form | ||||
|                 await stream.send( | ||||
|                     await client.search_symbols(pattern)) | ||||
|  |  | |||
|  | @ -1,142 +0,0 @@ | |||
| # piker: trading gear for hackers | ||||
| # Copyright (C) Tyler Goodlet (in stewardship for pikers) | ||||
| 
 | ||||
| # This program is free software: you can redistribute it and/or modify | ||||
| # it under the terms of the GNU Affero General Public License as published by | ||||
| # the Free Software Foundation, either version 3 of the License, or | ||||
| # (at your option) any later version. | ||||
| 
 | ||||
| # This program is distributed in the hope that it will be useful, | ||||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | ||||
| # GNU Affero General Public License for more details. | ||||
| 
 | ||||
| # You should have received a copy of the GNU Affero General Public License | ||||
| # along with this program.  If not, see <https://www.gnu.org/licenses/>. | ||||
| 
 | ||||
| """ | ||||
| Per market data-type definitions and schemas types. | ||||
| 
 | ||||
| """ | ||||
| from __future__ import annotations | ||||
| from typing import ( | ||||
|     Literal, | ||||
| ) | ||||
| from decimal import Decimal | ||||
| 
 | ||||
| from msgspec import field | ||||
| 
 | ||||
| from piker.types import Struct | ||||
| 
 | ||||
| 
 | ||||
| # API endpoint paths by venue / sub-API | ||||
| _domain: str = 'deribit.com' | ||||
| _url = f'https://www.{_domain}' | ||||
| 
 | ||||
| # WEBsocketz | ||||
| _ws_url: str = f'wss://www.{_domain}/ws/api/v2' | ||||
| 
 | ||||
| # test nets | ||||
| _testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2' | ||||
| 
 | ||||
| MarketType = Literal[ | ||||
|     'option' | ||||
| ] | ||||
| 
 | ||||
| 
 | ||||
| def get_api_eps(venue: MarketType) -> tuple[str, str]: | ||||
|     ''' | ||||
|     Return API ep root paths per venue. | ||||
| 
 | ||||
|     ''' | ||||
|     return { | ||||
|         'option': ( | ||||
|             _ws_url, | ||||
|         ), | ||||
|     }[venue] | ||||
| 
 | ||||
| 
 | ||||
| class Pair(Struct, frozen=True, kw_only=True): | ||||
| 
 | ||||
|     symbol: str | ||||
| 
 | ||||
|     # src | ||||
|     quote_currency: str # 'BTC' | ||||
| 
 | ||||
|     # dst | ||||
|     base_currency: str # "BTC", | ||||
| 
 | ||||
|     tick_size: float # 0.0001 | ||||
|     tick_size_steps: list[dict[str, str | int | float]] # [{'above_price': 0.005, 'tick_size': 0.0005}] | ||||
| 
 | ||||
|     @property | ||||
|     def price_tick(self) -> Decimal: | ||||
|         step_size: float = self.tick_size_steps[0].get('above_price') | ||||
|         return Decimal(step_size) | ||||
| 
 | ||||
|     @property | ||||
|     def size_tick(self) -> Decimal: | ||||
|         step_size: float = self.tick_size_steps[0].get('tick_size') | ||||
|         return Decimal(step_size) | ||||
| 
 | ||||
|     @property | ||||
|     def bs_fqme(self) -> str: | ||||
|         return self.symbol | ||||
| 
 | ||||
|     @property | ||||
|     def bs_mktid(self) -> str: | ||||
|         return f'{self.symbol}.{self.venue}' | ||||
| 
 | ||||
| 
 | ||||
| class OptionPair(Pair, frozen=True, kw_only=True): | ||||
| 
 | ||||
|     taker_commission: float # 0.0003 | ||||
|     strike: float # 5000.0 | ||||
|     settlement_period: str # 'day' | ||||
|     settlement_currency: str # "BTC", | ||||
|     rfq: bool # false | ||||
|     price_index: str # 'btc_usd' | ||||
|     option_type: str # 'call' | ||||
|     min_trade_amount: float # 0.1 | ||||
|     maker_commission: float # 0.0003 | ||||
|     kind: str # 'option' | ||||
|     is_active: bool # true | ||||
|     instrument_type: str # 'reversed' | ||||
|     instrument_name: str # 'BTC-1SEP24-55000-C' | ||||
|     instrument_id: int # 364671 | ||||
|     expiration_timestamp: int # 1725177600000 | ||||
|     creation_timestamp: int # 1724918461000 | ||||
|     counter_currency: str # 'USD'  | ||||
|     contract_size: float # '1.0' | ||||
|     block_trade_tick_size: float # '0.0001' | ||||
|     block_trade_min_trade_amount: int # '25' | ||||
|     block_trade_commission: float # '0.003' | ||||
| 
 | ||||
| 
 | ||||
|     # NOTE: see `.data._symcache.SymbologyCache.load()` for why | ||||
|     ns_path: str = 'piker.brokers.deribit:OptionPair' | ||||
| 
 | ||||
|     @property | ||||
|     def venue(self) -> str: | ||||
|         return 'OPTION' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_fqme(self) -> str: | ||||
|         return f'{self.symbol}.OPTION' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_src_asset(self) -> str: | ||||
|         return f'{self.quote_currency}' | ||||
| 
 | ||||
|     @property | ||||
|     def bs_dst_asset(self) -> str: | ||||
|         return f'{self.base_currency}' | ||||
|      | ||||
|     @property | ||||
|     def bs_mktid(self) -> str: | ||||
|         return f'{self.symbol}.{self.venue}' | ||||
| 
 | ||||
| 
 | ||||
| PAIRTYPES: dict[MarketType, Pair] = { | ||||
|     'option': OptionPair, | ||||
| } | ||||
|  | @ -111,10 +111,6 @@ class KucoinMktPair(Struct, frozen=True): | |||
|     quoteMaxSize: float | ||||
|     quoteMinSize: float | ||||
|     symbol: str  # our bs_mktid, kucoin's internal id | ||||
|     feeCategory: int | ||||
|     makerFeeCoefficient: float | ||||
|     takerFeeCoefficient: float | ||||
|     st: bool | ||||
| 
 | ||||
| 
 | ||||
| class AccountTrade(Struct, frozen=True): | ||||
|  | @ -597,7 +593,7 @@ async def get_client() -> AsyncGenerator[Client, None]: | |||
|     ''' | ||||
|     async with ( | ||||
|         httpx.AsyncClient( | ||||
|             base_url='https://api.kucoin.com/api', | ||||
|             base_url=f'https://api.kucoin.com/api', | ||||
|         ) as trio_client, | ||||
|     ): | ||||
|         client = Client(httpx_client=trio_client) | ||||
|  | @ -641,7 +637,7 @@ async def open_ping_task( | |||
|                 await trio.sleep((ping_interval - 1000) / 1000) | ||||
|                 await ws.send_msg({'id': connect_id, 'type': 'ping'}) | ||||
| 
 | ||||
|         log.warning('Starting ping task for kucoin ws connection') | ||||
|         log.info('Starting ping task for kucoin ws connection') | ||||
|         n.start_soon(ping_server) | ||||
| 
 | ||||
|         yield | ||||
|  | @ -653,14 +649,9 @@ async def open_ping_task( | |||
| async def get_mkt_info( | ||||
|     fqme: str, | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|     MktPair, | ||||
|     KucoinMktPair, | ||||
| ]: | ||||
| ) -> tuple[MktPair, KucoinMktPair]: | ||||
|     ''' | ||||
|     Query for and return both a `piker.accounting.MktPair` and | ||||
|     `KucoinMktPair` from provided `fqme: str` | ||||
|     (fully-qualified-market-endpoint). | ||||
|     Query for and return a `MktPair` and `KucoinMktPair`. | ||||
| 
 | ||||
|     ''' | ||||
|     async with open_cached_client('kucoin') as client: | ||||
|  | @ -735,8 +726,6 @@ async def stream_quotes( | |||
| 
 | ||||
|         log.info(f'Starting up quote stream(s) for {symbols}') | ||||
|         for sym_str in symbols: | ||||
|             mkt: MktPair | ||||
|             pair: KucoinMktPair | ||||
|             mkt, pair = await get_mkt_info(sym_str) | ||||
|             init_msgs.append( | ||||
|                 FeedInit(mkt_info=mkt) | ||||
|  | @ -744,11 +733,7 @@ async def stream_quotes( | |||
| 
 | ||||
|         ws: NoBsWs | ||||
|         token, ping_interval = await client._get_ws_token() | ||||
|         log.info('API reported ping_interval: {ping_interval}\n') | ||||
| 
 | ||||
|         connect_id: str = str(uuid4()) | ||||
|         typ: str | ||||
|         quote: dict | ||||
|         connect_id = str(uuid4()) | ||||
|         async with ( | ||||
|             open_autorecon_ws( | ||||
|                 ( | ||||
|  | @ -762,37 +747,20 @@ async def stream_quotes( | |||
|                 ), | ||||
|             ) as ws, | ||||
|             open_ping_task(ws, ping_interval, connect_id), | ||||
|             aclosing( | ||||
|                 iter_normed_quotes( | ||||
|                     ws, sym_str | ||||
|                 ) | ||||
|             ) as iter_quotes, | ||||
|             aclosing(stream_messages(ws, sym_str)) as msg_gen, | ||||
|         ): | ||||
|             typ, quote = await anext(iter_quotes) | ||||
|             typ, quote = await anext(msg_gen) | ||||
| 
 | ||||
|             # take care to not unblock here until we get a real | ||||
|             # trade quote? | ||||
|             # ^TODO, remove this right? | ||||
|             # -[ ] what often blocks chart boot/new-feed switching | ||||
|             #   since we'ere waiting for a live quote instead of just | ||||
|             #   loading history afap.. | ||||
|             #  |_ XXX, not sure if we require a bit of rework to core | ||||
|             #    feed init logic or if backends justg gotta be | ||||
|             #    changed up.. feel like there was some causality | ||||
|             #    dilema prolly only seen with IB too.. | ||||
|             # while typ != 'trade': | ||||
|             #     typ, quote = await anext(iter_quotes) | ||||
|             while typ != 'trade': | ||||
|                 # take care to not unblock here until we get a real | ||||
|                 # trade quote | ||||
|                 typ, quote = await anext(msg_gen) | ||||
| 
 | ||||
|             task_status.started((init_msgs, quote)) | ||||
|             feed_is_live.set() | ||||
| 
 | ||||
|             # XXX NOTE, DO NOT include the `.<backend>` suffix! | ||||
|             # OW the sampling loop will not broadcast correctly.. | ||||
|             # since `bus._subscribers.setdefault(bs_fqme, set())` | ||||
|             # is used inside `.data.open_feed_bus()` !!! | ||||
|             topic: str = mkt.bs_fqme | ||||
|             async for typ, quote in iter_quotes: | ||||
|                 await send_chan.send({topic: quote}) | ||||
|             async for typ, msg in msg_gen: | ||||
|                 await send_chan.send({sym_str: msg}) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  | @ -847,7 +815,7 @@ async def subscribe( | |||
|             ) | ||||
| 
 | ||||
| 
 | ||||
| async def iter_normed_quotes( | ||||
| async def stream_messages( | ||||
|     ws: NoBsWs, | ||||
|     sym: str, | ||||
| 
 | ||||
|  | @ -878,9 +846,6 @@ async def iter_normed_quotes( | |||
| 
 | ||||
|                 yield 'trade', { | ||||
|                     'symbol': sym, | ||||
|                     # TODO, is 'last' even used elsewhere/a-good | ||||
|                     # semantic? can't we just read the ticks with our | ||||
|                     # .data.ticktools.frame_ticks()`/ | ||||
|                     'last': trade_data.price, | ||||
|                     'brokerd_ts': last_trade_ts, | ||||
|                     'ticks': [ | ||||
|  | @ -973,7 +938,7 @@ async def open_history_client( | |||
|             if end_dt is None: | ||||
|                 inow = round(time.time()) | ||||
| 
 | ||||
|                 log.debug( | ||||
|                 print( | ||||
|                     f'difference in time between load and processing' | ||||
|                     f'{inow - times[-1]}' | ||||
|                 ) | ||||
|  |  | |||
|  | @ -653,11 +653,7 @@ class Router(Struct): | |||
|             flume = feed.flumes[fqme] | ||||
|             first_quote: dict = flume.first_quote | ||||
|             book: DarkBook = self.get_dark_book(broker) | ||||
| 
 | ||||
|             if not (last := first_quote.get('last')): | ||||
|                 last: float = flume.rt_shm.array[-1]['close'] | ||||
| 
 | ||||
|             book.lasts[fqme]: float = float(last) | ||||
|             book.lasts[fqme]: float = float(first_quote['last']) | ||||
| 
 | ||||
|             async with self.maybe_open_brokerd_dialog( | ||||
|                 brokermod=brokermod, | ||||
|  | @ -720,7 +716,7 @@ class Router(Struct): | |||
|             subs = self.subscribers[sub_key] | ||||
| 
 | ||||
|         sent_some: bool = False | ||||
|         for client_stream in subs.copy(): | ||||
|         for client_stream in subs: | ||||
|             try: | ||||
|                 await client_stream.send(msg) | ||||
|                 sent_some = True | ||||
|  | @ -1014,14 +1010,10 @@ async def translate_and_relay_brokerd_events( | |||
|                 status_msg.brokerd_msg = msg | ||||
|                 status_msg.src = msg.broker_details['name'] | ||||
| 
 | ||||
|                 if not status_msg.req: | ||||
|                     # likely some order change state? | ||||
|                     await tractor.pause() | ||||
|                 else: | ||||
|                     await router.client_broadcast( | ||||
|                         status_msg.req.symbol, | ||||
|                         status_msg, | ||||
|                     ) | ||||
|                 await router.client_broadcast( | ||||
|                     status_msg.req.symbol, | ||||
|                     status_msg, | ||||
|                 ) | ||||
| 
 | ||||
|                 if status == 'closed': | ||||
|                     log.info(f'Execution for {oid} is complete!') | ||||
|  |  | |||
|  | @ -653,7 +653,6 @@ async def open_trade_dialog( | |||
|                 # in) use manually constructed table from calling | ||||
|                 # the `.get_mkt_info()` provider EP above. | ||||
|                 _mktmap_table=mkt_by_fqme, | ||||
|                 only_require=list(mkt_by_fqme), | ||||
|             ) | ||||
| 
 | ||||
|             pp_msgs: list[BrokerdPosition] = [] | ||||
|  |  | |||
|  | @ -335,7 +335,7 @@ def services(config, tl, ports): | |||
|                 name='service_query', | ||||
|                 loglevel=config['loglevel'] if tl else None, | ||||
|             ), | ||||
|             tractor.get_registry( | ||||
|             tractor.get_arbiter( | ||||
|                 host=host, | ||||
|                 port=ports[0] | ||||
|             ) as portal | ||||
|  |  | |||
|  | @ -25,7 +25,6 @@ from collections import ( | |||
|     defaultdict, | ||||
| ) | ||||
| from contextlib import asynccontextmanager as acm | ||||
| from functools import partial | ||||
| import time | ||||
| from typing import ( | ||||
|     Any, | ||||
|  | @ -43,7 +42,7 @@ from tractor.trionics import ( | |||
|     maybe_open_nursery, | ||||
| ) | ||||
| import trio | ||||
| from trio import TaskStatus | ||||
| from trio_typing import TaskStatus | ||||
| 
 | ||||
| from .ticktools import ( | ||||
|     frame_ticks, | ||||
|  | @ -71,7 +70,6 @@ if TYPE_CHECKING: | |||
| _default_delay_s: float = 1.0 | ||||
| 
 | ||||
| 
 | ||||
| # TODO: use new `tractor.singleton_acm` API for this! | ||||
| class Sampler: | ||||
|     ''' | ||||
|     Global sampling engine registry. | ||||
|  | @ -81,9 +79,9 @@ class Sampler: | |||
| 
 | ||||
|     This non-instantiated type is meant to be a singleton within | ||||
|     a `samplerd` actor-service spawned once by the user wishing to | ||||
|     time-step-sample a (real-time) quote feeds, see | ||||
|     `.service.maybe_open_samplerd()` and the below | ||||
|     `register_with_sampler()`. | ||||
|     time-step-sample (real-time) quote feeds, see | ||||
|     ``.service.maybe_open_samplerd()`` and the below | ||||
|     ``register_with_sampler()``. | ||||
| 
 | ||||
|     ''' | ||||
|     service_nursery: None | trio.Nursery = None | ||||
|  | @ -377,10 +375,7 @@ async def register_with_sampler( | |||
|                 assert Sampler.ohlcv_shms | ||||
| 
 | ||||
|             # unblock caller | ||||
|             await ctx.started( | ||||
|                 # XXX bc msgpack only allows one array type! | ||||
|                 list(Sampler.ohlcv_shms.keys()) | ||||
|             ) | ||||
|             await ctx.started(set(Sampler.ohlcv_shms.keys())) | ||||
| 
 | ||||
|             if open_index_stream: | ||||
|                 try: | ||||
|  | @ -424,6 +419,7 @@ async def register_with_sampler( | |||
| 
 | ||||
| 
 | ||||
| async def spawn_samplerd( | ||||
| 
 | ||||
|     loglevel: str | None = None, | ||||
|     **extra_tractor_kwargs | ||||
| 
 | ||||
|  | @ -433,10 +429,7 @@ async def spawn_samplerd( | |||
|     update and increment count write and stream broadcasting. | ||||
| 
 | ||||
|     ''' | ||||
|     from piker.service import ( | ||||
|         get_service_mngr, | ||||
|         ServiceMngr, | ||||
|     ) | ||||
|     from piker.service import Services | ||||
| 
 | ||||
|     dname = 'samplerd' | ||||
|     log.info(f'Spawning `{dname}`') | ||||
|  | @ -444,33 +437,26 @@ async def spawn_samplerd( | |||
|     # singleton lock creation of ``samplerd`` since we only ever want | ||||
|     # one daemon per ``pikerd`` proc tree. | ||||
|     # TODO: make this built-into the service api? | ||||
|     mngr: ServiceMngr = get_service_mngr() | ||||
|     already_started: bool = dname in mngr.service_tasks | ||||
|     async with Services.locks[dname + '_singleton']: | ||||
| 
 | ||||
|     async with mngr._locks[dname + '_singleton']: | ||||
|         ctx: Context = await mngr.start_service( | ||||
|             daemon_name=dname, | ||||
|             ctx_ep=partial( | ||||
|         if dname not in Services.service_tasks: | ||||
| 
 | ||||
|             portal = await Services.actor_n.start_actor( | ||||
|                 dname, | ||||
|                 enable_modules=[ | ||||
|                     'piker.data._sampling', | ||||
|                 ], | ||||
|                 loglevel=loglevel, | ||||
|                 debug_mode=Services.debug_mode,  # set by pikerd flag | ||||
|                 **extra_tractor_kwargs | ||||
|             ) | ||||
| 
 | ||||
|             await Services.start_service_task( | ||||
|                 dname, | ||||
|                 portal, | ||||
|                 register_with_sampler, | ||||
|                 period_s=1, | ||||
|                 sub_for_broadcasts=False, | ||||
|             ), | ||||
|             debug_mode=mngr.debug_mode,  # set by pikerd flag | ||||
| 
 | ||||
|             # proxy-through to tractor | ||||
|             enable_modules=[ | ||||
|                 'piker.data._sampling', | ||||
|             ], | ||||
|             loglevel=loglevel, | ||||
|             **extra_tractor_kwargs | ||||
|         ) | ||||
|         if not already_started: | ||||
|             assert ( | ||||
|                 ctx | ||||
|                 and | ||||
|                 ctx.portal | ||||
|                 and | ||||
|                 not ctx.cancel_called | ||||
|             ) | ||||
|             return True | ||||
| 
 | ||||
|  | @ -903,7 +889,6 @@ async def uniform_rate_send( | |||
|             # to consumers which crash or lose network connection. | ||||
|             # I.e. we **DO NOT** want to crash and propagate up to | ||||
|             # ``pikerd`` these kinds of errors! | ||||
|             trio.EndOfChannel, | ||||
|             trio.ClosedResourceError, | ||||
|             trio.BrokenResourceError, | ||||
|             ConnectionResetError, | ||||
|  |  | |||
|  | @ -30,11 +30,7 @@ Actor runtime primtives and (distributed) service APIs for, | |||
|   => TODO: maybe to (re)move elsewhere? | ||||
| 
 | ||||
| ''' | ||||
| from ._mngr import ( | ||||
|     get_service_mngr as get_service_mngr, | ||||
|     open_service_mngr as open_service_mngr, | ||||
|     ServiceMngr as ServiceMngr, | ||||
| ) | ||||
| from ._mngr import Services as Services | ||||
| from ._registry import ( | ||||
|     _tractor_kwargs as _tractor_kwargs, | ||||
|     _default_reg_addr as _default_reg_addr, | ||||
|  |  | |||
|  | @ -21,6 +21,7 @@ | |||
| from __future__ import annotations | ||||
| import os | ||||
| from typing import ( | ||||
|     Optional, | ||||
|     Any, | ||||
|     ClassVar, | ||||
| ) | ||||
|  | @ -29,13 +30,13 @@ from contextlib import ( | |||
| ) | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
| 
 | ||||
| from ._util import ( | ||||
|     get_console_log, | ||||
| ) | ||||
| from ._mngr import ( | ||||
|     open_service_mngr, | ||||
|     ServiceMngr, | ||||
|     Services, | ||||
| ) | ||||
| from ._registry import (  # noqa | ||||
|     _tractor_kwargs, | ||||
|  | @ -58,7 +59,7 @@ async def open_piker_runtime( | |||
|     registry_addrs: list[tuple[str, int]] = [], | ||||
| 
 | ||||
|     enable_modules: list[str] = [], | ||||
|     loglevel: str|None = None, | ||||
|     loglevel: Optional[str] = None, | ||||
| 
 | ||||
|     # XXX NOTE XXX: you should pretty much never want debug mode | ||||
|     # for data daemons when running in production. | ||||
|  | @ -68,7 +69,7 @@ async def open_piker_runtime( | |||
|     # and spawn the service tree distributed per that. | ||||
|     start_method: str = 'trio', | ||||
| 
 | ||||
|     tractor_runtime_overrides: dict|None = None, | ||||
|     tractor_runtime_overrides: dict | None = None, | ||||
|     **tractor_kwargs, | ||||
| 
 | ||||
| ) -> tuple[ | ||||
|  | @ -118,10 +119,6 @@ async def open_piker_runtime( | |||
|                 # spawn other specialized daemons I think? | ||||
|                 enable_modules=enable_modules, | ||||
| 
 | ||||
|                 # TODO: how to configure this? | ||||
|                 # keep it on by default if debug mode is set? | ||||
|                 # maybe_enable_greenback=debug_mode, | ||||
| 
 | ||||
|                 **tractor_kwargs, | ||||
|             ) as actor, | ||||
| 
 | ||||
|  | @ -170,13 +167,12 @@ async def open_pikerd( | |||
| 
 | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> ServiceMngr: | ||||
| ) -> Services: | ||||
|     ''' | ||||
|     Start a root piker daemon actor (aka `pikerd`) with an indefinite | ||||
|     lifetime. | ||||
|     Start a root piker daemon with an indefinite lifetime. | ||||
| 
 | ||||
|     A root actor-nursery is created which can be used to spawn and | ||||
|     supervise underling service sub-actors (see below). | ||||
|     A root actor nursery is created which can be used to create and keep | ||||
|     alive underling services (see below). | ||||
| 
 | ||||
|     ''' | ||||
|     # NOTE: for the root daemon we always enable the root | ||||
|  | @ -203,6 +199,8 @@ async def open_pikerd( | |||
|             root_actor, | ||||
|             reg_addrs, | ||||
|         ), | ||||
|         tractor.open_nursery() as actor_nursery, | ||||
|         trio.open_nursery() as service_nursery, | ||||
|     ): | ||||
|         for addr in reg_addrs: | ||||
|             if addr not in root_actor.accept_addrs: | ||||
|  | @ -211,17 +209,25 @@ async def open_pikerd( | |||
|                     'Maybe you have another daemon already running?' | ||||
|                 ) | ||||
| 
 | ||||
|         mngr: ServiceMngr | ||||
|         async with open_service_mngr( | ||||
|             debug_mode=debug_mode, | ||||
|         ) as mngr: | ||||
|             yield mngr | ||||
|         # assign globally for future daemon/task creation | ||||
|         Services.actor_n = actor_nursery | ||||
|         Services.service_n = service_nursery | ||||
|         Services.debug_mode = debug_mode | ||||
| 
 | ||||
|         try: | ||||
|             yield Services | ||||
| 
 | ||||
|         finally: | ||||
|             # TODO: is this more clever/efficient? | ||||
|             # if 'samplerd' in Services.service_tasks: | ||||
|             #     await Services.cancel_service('samplerd') | ||||
|             service_nursery.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| # TODO: do we even need this? | ||||
| # @acm | ||||
| # async def maybe_open_runtime( | ||||
| #     loglevel: str|None = None, | ||||
| #     loglevel: Optional[str] = None, | ||||
| #     **kwargs, | ||||
| 
 | ||||
| # ) -> None: | ||||
|  | @ -250,7 +256,7 @@ async def maybe_open_pikerd( | |||
|     loglevel: str | None = None, | ||||
|     **kwargs, | ||||
| 
 | ||||
| ) -> tractor._portal.Portal | ClassVar[ServiceMngr]: | ||||
| ) -> tractor._portal.Portal | ClassVar[Services]: | ||||
|     ''' | ||||
|     If no ``pikerd`` daemon-root-actor can be found start it and | ||||
|     yield up (we should probably figure out returning a portal to self | ||||
|  |  | |||
|  | @ -49,7 +49,7 @@ from requests.exceptions import ( | |||
|     ReadTimeout, | ||||
| ) | ||||
| 
 | ||||
| from ._mngr import ServiceMngr | ||||
| from ._mngr import Services | ||||
| from ._util import ( | ||||
|     log,  # sub-sys logger | ||||
|     get_console_log, | ||||
|  | @ -453,7 +453,7 @@ async def open_ahabd( | |||
| 
 | ||||
| @acm | ||||
| async def start_ahab_service( | ||||
|     services: ServiceMngr, | ||||
|     services: Services, | ||||
|     service_name: str, | ||||
| 
 | ||||
|     # endpoint config passed as **kwargs | ||||
|  | @ -549,8 +549,7 @@ async def start_ahab_service( | |||
|         log.warning('Failed to cancel root permsed container') | ||||
| 
 | ||||
|     except ( | ||||
|         # trio.MultiError, | ||||
|         ExceptionGroup, | ||||
|         trio.MultiError, | ||||
|     ) as err: | ||||
|         for subexc in err.exceptions: | ||||
|             if isinstance(subexc, PermissionError): | ||||
|  |  | |||
|  | @ -26,17 +26,14 @@ from typing import ( | |||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
| ) | ||||
| from collections import defaultdict | ||||
| 
 | ||||
| import tractor | ||||
| import trio | ||||
| 
 | ||||
| from ._util import ( | ||||
|     log,  # sub-sys logger | ||||
| ) | ||||
| from ._mngr import ( | ||||
|     get_service_mngr, | ||||
|     ServiceMngr, | ||||
|     Services, | ||||
| ) | ||||
| from ._actor_runtime import maybe_open_pikerd | ||||
| from ._registry import find_service | ||||
|  | @ -44,14 +41,15 @@ from ._registry import find_service | |||
| 
 | ||||
| @acm | ||||
| async def maybe_spawn_daemon( | ||||
| 
 | ||||
|     service_name: str, | ||||
|     service_task_target: Callable, | ||||
| 
 | ||||
|     spawn_args: dict[str, Any], | ||||
| 
 | ||||
|     loglevel: str | None = None, | ||||
|     singleton: bool = False, | ||||
| 
 | ||||
|     _locks = defaultdict(trio.Lock), | ||||
|     **pikerd_kwargs, | ||||
| 
 | ||||
| ) -> tractor.Portal: | ||||
|  | @ -69,7 +67,7 @@ async def maybe_spawn_daemon( | |||
|     ''' | ||||
|     # serialize access to this section to avoid | ||||
|     # 2 or more tasks racing to create a daemon | ||||
|     lock = _locks[service_name] | ||||
|     lock = Services.locks[service_name] | ||||
|     await lock.acquire() | ||||
| 
 | ||||
|     async with find_service( | ||||
|  | @ -134,65 +132,7 @@ async def maybe_spawn_daemon( | |||
|         async with tractor.wait_for_actor(service_name) as portal: | ||||
|             lock.release() | ||||
|             yield portal | ||||
|             # --- ---- --- | ||||
|             # XXX NOTE XXX | ||||
|             # --- ---- --- | ||||
|             # DO NOT PUT A `portal.cancel_actor()` here (as was prior)! | ||||
|             # | ||||
|             # Doing so will cause an "out-of-band" ctxc | ||||
|             # (`tractor.ContextCancelled`) to be raised inside the | ||||
|             # `ServiceMngr.open_context_in_task()`'s call to | ||||
|             # `ctx.wait_for_result()` AND the internal self-ctxc | ||||
|             # "graceful capture" WILL NOT CATCH IT! | ||||
|             # | ||||
|             # This can cause certain types of operations to raise | ||||
|             # that ctxc BEFORE THEY `return`, resulting in | ||||
|             # a "false-negative" ctxc being raised when really | ||||
|             # nothing actually failed, other then our semantic | ||||
|             # "failure" to suppress an expected, graceful, | ||||
|             # self-cancel scenario.. | ||||
|             # | ||||
|             # bUt wHy duZ It WorK lIKe dis.. | ||||
|             # ------------------------------ | ||||
|             # from the perspective of the `tractor.Context` this | ||||
|             # cancel request was conducted "out of band" since | ||||
|             # `Context.cancel()` was never called and thus the | ||||
|             # `._cancel_called: bool` was never set. Despite the | ||||
|             # remote `.canceller` being set to `pikerd` (i.e. the | ||||
|             # same `Actor.uid` of the raising service-mngr task) the | ||||
|             # service-task's ctx itself was never marked as having | ||||
|             # requested cancellation and thus still raises the ctxc | ||||
|             # bc it was unaware of any such request. | ||||
|             # | ||||
|             # How to make grokin these cases easier tho? | ||||
|             # ------------------------------------------ | ||||
|             # Because `Portal.cancel_actor()` was called it requests | ||||
|             # "full-`Actor`-runtime-cancellation" of it's peer | ||||
|             # process which IS NOT THE SAME as a single inter-actor | ||||
|             # RPC task cancelling its local context with a remote | ||||
|             # peer `Task` in that same peer process. | ||||
|             # | ||||
|             # ?TODO? It might be better if we do one (or all) of the | ||||
|             # following: | ||||
|             # | ||||
|             # -[ ] at least set a special message for the | ||||
|             #    `ContextCancelled` when raised locally by the | ||||
|             #    unaware ctx task such that we check for the | ||||
|             #    `.canceller` being *our `Actor`* and in the case | ||||
|             #    where `Context._cancel_called == False` we specially | ||||
|             #    note that this is likely an "out-of-band" | ||||
|             #    runtime-cancel request triggered by some call to | ||||
|             #    `Portal.cancel_actor()`, possibly even reporting the | ||||
|             #    exact LOC of that caller by tracking it inside our | ||||
|             #    portal-type? | ||||
|             # -[ ] possibly add another field `ContextCancelled` like | ||||
|             #    maybe a, | ||||
|             #    `.request_type: Literal['os', 'proc', 'actor', | ||||
|             #    'ctx']` type thing which would allow immediately | ||||
|             #    being able to tell what kind of cancellation caused | ||||
|             #    the unexpected ctxc? | ||||
|             # -[ ] REMOVE THIS COMMENT, once we've settled on how to | ||||
|             #     better augment `tractor` to be more explicit on this! | ||||
|             await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| async def spawn_emsd( | ||||
|  | @ -207,22 +147,21 @@ async def spawn_emsd( | |||
|     """ | ||||
|     log.info('Spawning emsd') | ||||
| 
 | ||||
|     smngr: ServiceMngr = get_service_mngr() | ||||
|     portal = await smngr.actor_n.start_actor( | ||||
|     portal = await Services.actor_n.start_actor( | ||||
|         'emsd', | ||||
|         enable_modules=[ | ||||
|             'piker.clearing._ems', | ||||
|             'piker.clearing._client', | ||||
|         ], | ||||
|         loglevel=loglevel, | ||||
|         debug_mode=smngr.debug_mode,  # set by pikerd flag | ||||
|         debug_mode=Services.debug_mode,  # set by pikerd flag | ||||
|         **extra_tractor_kwargs | ||||
|     ) | ||||
| 
 | ||||
|     # non-blocking setup of clearing service | ||||
|     from ..clearing._ems import _setup_persistent_emsd | ||||
| 
 | ||||
|     await smngr.start_service_task( | ||||
|     await Services.start_service_task( | ||||
|         'emsd', | ||||
|         portal, | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,29 +18,16 @@ | |||
| daemon-service management API. | ||||
| 
 | ||||
| """ | ||||
| from __future__ import annotations | ||||
| from contextlib import ( | ||||
|     asynccontextmanager as acm, | ||||
|     # contextmanager as cm, | ||||
| ) | ||||
| from collections import defaultdict | ||||
| from dataclasses import ( | ||||
|     dataclass, | ||||
|     field, | ||||
| ) | ||||
| import functools | ||||
| import inspect | ||||
| from typing import ( | ||||
|     Callable, | ||||
|     Any, | ||||
| ) | ||||
| 
 | ||||
| import msgspec | ||||
| import tractor | ||||
| import trio | ||||
| from trio import TaskStatus | ||||
| from trio_typing import TaskStatus | ||||
| import tractor | ||||
| from tractor import ( | ||||
|     ActorNursery, | ||||
|     current_actor, | ||||
|     ContextCancelled, | ||||
|     Context, | ||||
|  | @ -52,130 +39,6 @@ from ._util import ( | |||
| ) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: implement a singleton deco-API for wrapping the below | ||||
| # factory's impl for general actor-singleton use? | ||||
| # | ||||
| # @singleton | ||||
| # async def open_service_mngr( | ||||
| #     **init_kwargs, | ||||
| # ) -> ServiceMngr: | ||||
| #     ''' | ||||
| #     Note this function body is invoke IFF no existing singleton instance already | ||||
| #     exists in this proc's memory. | ||||
| 
 | ||||
| #     ''' | ||||
| #     # setup | ||||
| #     yield ServiceMngr(**init_kwargs) | ||||
| #     # teardown | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| # TODO: singleton factory API instead of a class API | ||||
| @acm | ||||
| async def open_service_mngr( | ||||
|     *, | ||||
|     debug_mode: bool = False, | ||||
| 
 | ||||
|     # impl deat which ensures a single global instance | ||||
|     _singleton: list[ServiceMngr|None] = [None], | ||||
|     **init_kwargs, | ||||
| 
 | ||||
| ) -> ServiceMngr: | ||||
|     ''' | ||||
|     Open a multi-subactor-as-service-daemon tree supervisor. | ||||
| 
 | ||||
|     The delivered `ServiceMngr` is a singleton instance for each | ||||
|     actor-process and is allocated on first open and never | ||||
|     de-allocated unless explicitly deleted by al call to | ||||
|     `del_service_mngr()`. | ||||
| 
 | ||||
|     ''' | ||||
|     # TODO: factor this an allocation into | ||||
|     # a `._mngr.open_service_mngr()` and put in the | ||||
|     # once-n-only-once setup/`.__aenter__()` part! | ||||
|     # -[ ] how to make this only happen on the `mngr == None` case? | ||||
|     #  |_ use `.trionics.maybe_open_context()` (for generic | ||||
|     #     async-with-style-only-once of the factory impl, though | ||||
|     #     what do we do for the allocation case? | ||||
|     #    / `.maybe_open_nursery()` (since for this specific case | ||||
|     #    it's simpler?) to activate | ||||
|     async with ( | ||||
|         tractor.open_nursery() as an, | ||||
|         trio.open_nursery() as tn, | ||||
|     ): | ||||
|         # impl specific obvi.. | ||||
|         init_kwargs.update({ | ||||
|             'actor_n': an, | ||||
|             'service_n': tn, | ||||
|         }) | ||||
| 
 | ||||
|         mngr: ServiceMngr|None | ||||
|         if (mngr := _singleton[0]) is None: | ||||
| 
 | ||||
|             log.info('Allocating a new service mngr!') | ||||
|             mngr = _singleton[0] = ServiceMngr(**init_kwargs) | ||||
| 
 | ||||
|             # TODO: put into `.__aenter__()` section of | ||||
|             # eventual `@singleton_acm` API wrapper. | ||||
|             # | ||||
|             # assign globally for future daemon/task creation | ||||
|             mngr.actor_n = an | ||||
|             mngr.service_n = tn | ||||
| 
 | ||||
|         else: | ||||
|             assert ( | ||||
|                 mngr.actor_n | ||||
|                 and | ||||
|                 mngr.service_tn | ||||
|             ) | ||||
|             log.info( | ||||
|                 'Using extant service mngr!\n\n' | ||||
|                 f'{mngr!r}\n'  # it has a nice `.__repr__()` of services state | ||||
|             ) | ||||
| 
 | ||||
|         try: | ||||
|             # NOTE: this is a singleton factory impl specific detail | ||||
|             # which should be supported in the condensed | ||||
|             # `@singleton_acm` API? | ||||
|             mngr.debug_mode = debug_mode | ||||
| 
 | ||||
|             yield mngr | ||||
|         finally: | ||||
|             # TODO: is this more clever/efficient? | ||||
|             # if 'samplerd' in mngr.service_tasks: | ||||
|             #     await mngr.cancel_service('samplerd') | ||||
|             tn.cancel_scope.cancel() | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| def get_service_mngr() -> ServiceMngr: | ||||
|     ''' | ||||
|     Try to get the singleton service-mngr for this actor presuming it | ||||
|     has already been allocated using, | ||||
| 
 | ||||
|     .. code:: python | ||||
| 
 | ||||
|         async with open_<@singleton_acm(func)>() as mngr` | ||||
|             ... this block kept open ... | ||||
| 
 | ||||
|     If not yet allocated raise a `ServiceError`. | ||||
| 
 | ||||
|     ''' | ||||
|     # https://stackoverflow.com/a/12627202 | ||||
|     # https://docs.python.org/3/library/inspect.html#inspect.Signature | ||||
|     maybe_mngr: ServiceMngr|None = inspect.signature( | ||||
|         open_service_mngr | ||||
|     ).parameters['_singleton'].default[0] | ||||
| 
 | ||||
|     if maybe_mngr is None: | ||||
|         raise RuntimeError( | ||||
|             'Someone must allocate a `ServiceMngr` using\n\n' | ||||
|             '`async with open_service_mngr()` beforehand!!\n' | ||||
|         ) | ||||
| 
 | ||||
|     return maybe_mngr | ||||
| 
 | ||||
| 
 | ||||
| # TODO: we need remote wrapping and a general soln: | ||||
| # - factor this into a ``tractor.highlevel`` extension # pack for the | ||||
| #   library. | ||||
|  | @ -183,46 +46,31 @@ def get_service_mngr() -> ServiceMngr: | |||
| #   to the pikerd actor for starting services remotely! | ||||
| # - prolly rename this to ActorServicesNursery since it spawns | ||||
| #   new actors and supervises them to completion? | ||||
| @dataclass | ||||
| class ServiceMngr: | ||||
| # class ServiceMngr(msgspec.Struct): | ||||
|     ''' | ||||
|     A multi-subactor-as-service manager. | ||||
| class Services: | ||||
| 
 | ||||
|     Spawn, supervise and monitor service/daemon subactors in a SC | ||||
|     process tree. | ||||
| 
 | ||||
|     ''' | ||||
|     actor_n: ActorNursery | ||||
|     actor_n: tractor._supervise.ActorNursery | ||||
|     service_n: trio.Nursery | ||||
|     debug_mode: bool = False # tractor sub-actor debug mode flag | ||||
| 
 | ||||
|     debug_mode: bool  # tractor sub-actor debug mode flag | ||||
|     service_tasks: dict[ | ||||
|         str, | ||||
|         tuple[ | ||||
|             trio.CancelScope, | ||||
|             Context, | ||||
|             Portal, | ||||
|             trio.Event, | ||||
|         ] | ||||
|     ] = field(default_factory=dict) | ||||
| 
 | ||||
|     # internal per-service task mutexs | ||||
|     _locks = defaultdict(trio.Lock) | ||||
|     ] = {} | ||||
|     locks = defaultdict(trio.Lock) | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def start_service_task( | ||||
|         self, | ||||
|         name: str, | ||||
|         portal: Portal, | ||||
| 
 | ||||
|         # TODO: typevar for the return type of the target and then | ||||
|         # use it below for `ctx_res`? | ||||
|         target: Callable, | ||||
| 
 | ||||
|         allow_overruns: bool = False, | ||||
|         **ctx_kwargs, | ||||
| 
 | ||||
|     ) -> (trio.CancelScope, Context, Any): | ||||
|     ) -> (trio.CancelScope, Context): | ||||
|         ''' | ||||
|         Open a context in a service sub-actor, add to a stack | ||||
|         that gets unwound at ``pikerd`` teardown. | ||||
|  | @ -235,7 +83,6 @@ class ServiceMngr: | |||
|             task_status: TaskStatus[ | ||||
|                 tuple[ | ||||
|                     trio.CancelScope, | ||||
|                     Context, | ||||
|                     trio.Event, | ||||
|                     Any, | ||||
|                 ] | ||||
|  | @ -243,87 +90,64 @@ class ServiceMngr: | |||
| 
 | ||||
|         ) -> Any: | ||||
| 
 | ||||
|             # TODO: use the ctx._scope directly here instead? | ||||
|             # -[ ] actually what semantics do we expect for this | ||||
|             #   usage!? | ||||
|             with trio.CancelScope() as cs: | ||||
|                 try: | ||||
|                     async with portal.open_context( | ||||
|                         target, | ||||
|                         allow_overruns=allow_overruns, | ||||
|                         **ctx_kwargs, | ||||
| 
 | ||||
|                     ) as (ctx, started): | ||||
|                 async with portal.open_context( | ||||
|                     target, | ||||
|                     allow_overruns=allow_overruns, | ||||
|                     **ctx_kwargs, | ||||
| 
 | ||||
|                         # unblock once the remote context has started | ||||
|                         complete = trio.Event() | ||||
|                         task_status.started(( | ||||
|                             cs, | ||||
|                             ctx, | ||||
|                             complete, | ||||
|                             started, | ||||
|                         )) | ||||
|                         log.info( | ||||
|                             f'`pikerd` service {name} started with value {started}' | ||||
|                         ) | ||||
|                 ) as (ctx, first): | ||||
| 
 | ||||
|                     # unblock once the remote context has started | ||||
|                     complete = trio.Event() | ||||
|                     task_status.started((cs, complete, first)) | ||||
|                     log.info( | ||||
|                         f'`pikerd` service {name} started with value {first}' | ||||
|                     ) | ||||
|                     try: | ||||
|                         # wait on any context's return value | ||||
|                         # and any final portal result from the | ||||
|                         # sub-actor. | ||||
|                         ctx_res: Any = await ctx.wait_for_result() | ||||
|                         ctx_res: Any = await ctx.result() | ||||
| 
 | ||||
|                         # NOTE: blocks indefinitely until cancelled | ||||
|                         # either by error from the target context | ||||
|                         # function or by being cancelled here by the | ||||
|                         # surrounding cancel scope. | ||||
|                         return ( | ||||
|                             await portal.wait_for_result(), | ||||
|                             ctx_res, | ||||
|                         ) | ||||
|                         return (await portal.result(), ctx_res) | ||||
|                     except ContextCancelled as ctxe: | ||||
|                         canceller: tuple[str, str] = ctxe.canceller | ||||
|                         our_uid: tuple[str, str] = current_actor().uid | ||||
|                         if ( | ||||
|                             canceller != portal.channel.uid | ||||
|                             and | ||||
|                             canceller != our_uid | ||||
|                         ): | ||||
|                             log.cancel( | ||||
|                                 f'Actor-service {name} was remotely cancelled?\n' | ||||
|                                 f'remote canceller: {canceller}\n' | ||||
|                                 f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n' | ||||
|                             ) | ||||
|                         else: | ||||
|                             raise | ||||
| 
 | ||||
|                 except ContextCancelled as ctxe: | ||||
|                     canceller: tuple[str, str] = ctxe.canceller | ||||
|                     our_uid: tuple[str, str] = current_actor().uid | ||||
|                     if ( | ||||
|                         canceller != portal.chan.uid | ||||
|                         and | ||||
|                         canceller != our_uid | ||||
|                     ): | ||||
|                         log.cancel( | ||||
|                             f'Actor-service `{name}` was remotely cancelled by a peer?\n' | ||||
| 
 | ||||
|                             # TODO: this would be a good spot to use | ||||
|                             # a respawn feature Bo | ||||
|                             f'-> Keeping `pikerd` service manager alive despite this inter-peer cancel\n\n' | ||||
| 
 | ||||
|                             f'cancellee: {portal.chan.uid}\n' | ||||
|                             f'canceller: {canceller}\n' | ||||
|                         ) | ||||
|                     else: | ||||
|                         raise | ||||
|                     finally: | ||||
|                         await portal.cancel_actor() | ||||
|                         complete.set() | ||||
|                         self.service_tasks.pop(name) | ||||
| 
 | ||||
|                 finally: | ||||
|                     # NOTE: the ctx MUST be cancelled first if we | ||||
|                     # don't want the above `ctx.wait_for_result()` to | ||||
|                     # raise a self-ctxc. WHY, well since from the ctx's | ||||
|                     # perspective the cancel request will have | ||||
|                     # arrived out-out-of-band at the `Actor.cancel()` | ||||
|                     # level, thus `Context.cancel_called == False`, | ||||
|                     # meaning `ctx._is_self_cancelled() == False`. | ||||
|                     # with trio.CancelScope(shield=True): | ||||
|                     # await ctx.cancel() | ||||
|                     await portal.cancel_actor() | ||||
|                     complete.set() | ||||
|                     self.service_tasks.pop(name) | ||||
| 
 | ||||
|         cs, sub_ctx, complete, started = await self.service_n.start( | ||||
|             open_context_in_task | ||||
|         ) | ||||
|         cs, complete, first = await self.service_n.start(open_context_in_task) | ||||
| 
 | ||||
|         # store the cancel scope and portal for later cancellation or | ||||
|         # retstart if needed. | ||||
|         self.service_tasks[name] = (cs, sub_ctx, portal, complete) | ||||
|         return cs, sub_ctx, started | ||||
|         self.service_tasks[name] = (cs, portal, complete) | ||||
| 
 | ||||
|         return cs, first | ||||
| 
 | ||||
|     @classmethod | ||||
|     async def cancel_service( | ||||
|         self, | ||||
|         name: str, | ||||
|  | @ -334,80 +158,8 @@ class ServiceMngr: | |||
| 
 | ||||
|         ''' | ||||
|         log.info(f'Cancelling `pikerd` service {name}') | ||||
|         cs, sub_ctx, portal, complete = self.service_tasks[name] | ||||
| 
 | ||||
|         # cs.cancel() | ||||
|         await sub_ctx.cancel() | ||||
|         cs, portal, complete = self.service_tasks[name] | ||||
|         cs.cancel() | ||||
|         await complete.wait() | ||||
| 
 | ||||
|         if name in self.service_tasks: | ||||
|             # TODO: custom err? | ||||
|             # raise ServiceError( | ||||
|             raise RuntimeError( | ||||
|                 f'Serice task for {name} not terminated?' | ||||
|             ) | ||||
| 
 | ||||
|         # assert name not in self.service_tasks, \ | ||||
|         #     f'Serice task for {name} not terminated?' | ||||
| 
 | ||||
|     async def start_service( | ||||
|         self, | ||||
|         daemon_name: str, | ||||
|         ctx_ep: Callable,  # kwargs must `partial`-ed in! | ||||
| 
 | ||||
|         debug_mode: bool = False, | ||||
|         **tractor_actor_kwargs, | ||||
| 
 | ||||
|     ) -> Context: | ||||
|         ''' | ||||
|         Start a "service" task in a new sub-actor (daemon) and manage it's lifetime | ||||
|         indefinitely. | ||||
| 
 | ||||
|         Services can be cancelled/shutdown using `.cancel_service()`. | ||||
| 
 | ||||
|         ''' | ||||
|         entry: tuple|None = self.service_tasks.get(daemon_name) | ||||
|         if entry: | ||||
|             (cs, sub_ctx, portal, complete) = entry | ||||
|             return sub_ctx | ||||
| 
 | ||||
|         if daemon_name not in self.service_tasks: | ||||
|             portal = await self.actor_n.start_actor( | ||||
|                 daemon_name, | ||||
|                 debug_mode=(  # maybe set globally during allocate | ||||
|                     debug_mode | ||||
|                     or | ||||
|                     self.debug_mode | ||||
|                 ), | ||||
|                 **tractor_actor_kwargs, | ||||
|             ) | ||||
|             ctx_kwargs: dict[str, Any] = {} | ||||
|             if isinstance(ctx_ep, functools.partial): | ||||
|                 ctx_kwargs: dict[str, Any] = ctx_ep.keywords | ||||
|                 ctx_ep: Callable = ctx_ep.func | ||||
| 
 | ||||
|             (cs, sub_ctx, started) = await self.start_service_task( | ||||
|                 daemon_name, | ||||
|                 portal, | ||||
|                 ctx_ep, | ||||
|                 **ctx_kwargs, | ||||
|             ) | ||||
| 
 | ||||
|             return sub_ctx | ||||
| 
 | ||||
| 
 | ||||
| # TODO: | ||||
| # -[ ] factor all the common shit from `.data._sampling` | ||||
| #   and `.brokers._daemon` into here / `ServiceMngr` | ||||
| #   in terms of allocating the `Portal` as part of the | ||||
| #   "service-in-subactor" starting! | ||||
| # -[ ] move to `tractor.hilevel._service`, import and use here! | ||||
| # NOTE: purposely leaks the ref to the mod-scope Bo | ||||
| # import tractor | ||||
| # from tractor.hilevel import ( | ||||
| #     open_service_mngr, | ||||
| #     ServiceMngr, | ||||
| # ) | ||||
| # mngr: ServiceMngr|None = None | ||||
| # with tractor.hilevel.open_service_mngr() as mngr: | ||||
| #     Services = proxy(mngr) | ||||
|         assert name not in self.service_tasks, \ | ||||
|             f'Serice task for {name} not terminated?' | ||||
|  |  | |||
|  | @ -21,13 +21,11 @@ from typing import ( | |||
|     TYPE_CHECKING, | ||||
| ) | ||||
| 
 | ||||
| # TODO: oof, needs to be changed to `httpx`! | ||||
| import asks | ||||
| 
 | ||||
| if TYPE_CHECKING: | ||||
|     import docker | ||||
|     from ._ahab import DockerContainer | ||||
|     from . import ServiceMngr | ||||
| 
 | ||||
| from ._util import log  # sub-sys logger | ||||
| from ._util import ( | ||||
|  | @ -129,7 +127,7 @@ def start_elasticsearch( | |||
| 
 | ||||
| @acm | ||||
| async def start_ahab_daemon( | ||||
|     service_mngr: ServiceMngr, | ||||
|     service_mngr: Services, | ||||
|     user_config: dict | None = None, | ||||
|     loglevel: str | None = None, | ||||
| 
 | ||||
|  |  | |||
|  | @ -53,7 +53,7 @@ import pendulum | |||
| # import purerpc | ||||
| 
 | ||||
| from ..data.feed import maybe_open_feed | ||||
| from . import ServiceMngr | ||||
| from . import Services | ||||
| from ._util import ( | ||||
|     log,  # sub-sys logger | ||||
|     get_console_log, | ||||
|  | @ -233,7 +233,7 @@ def start_marketstore( | |||
| 
 | ||||
| @acm | ||||
| async def start_ahab_daemon( | ||||
|     service_mngr: ServiceMngr, | ||||
|     service_mngr: Services, | ||||
|     user_config: dict | None = None, | ||||
|     loglevel: str | None = None, | ||||
| 
 | ||||
|  |  | |||
|  | @ -458,15 +458,13 @@ async def start_backfill( | |||
|                     'bf_until <- last_start_dt:\n' | ||||
|                     f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                 ) | ||||
|                 # UGH: what's a better way? | ||||
|                 # TODO: backends are responsible for being correct on | ||||
|                 # this right!? | ||||
|                 # -[ ] in the `ib` case we could maybe offer some way | ||||
|                 #     to halt the request loop until the condition is | ||||
|                 #     resolved or should the backend be entirely in | ||||
|                 #     charge of solving such faults? yes, right? | ||||
|                 # if timeframe > 1: | ||||
|                 #     await tractor.pause() | ||||
| 
 | ||||
|                 # ugh, what's a better way? | ||||
|                 # TODO: fwiw, we probably want a way to signal a throttle | ||||
|                 # condition (eg. with ib) so that we can halt the | ||||
|                 # request loop until the condition is resolved? | ||||
|                 if timeframe > 1: | ||||
|                     await tractor.pause() | ||||
|                 return | ||||
| 
 | ||||
|             assert ( | ||||
|  | @ -580,7 +578,6 @@ async def start_backfill( | |||
|                     'crypto', | ||||
|                     'crypto_currency', | ||||
|                     'fiat',  # a "forex pair" | ||||
|                     'perpetual_future',  # stupid "perps" from cex land | ||||
|                 }: | ||||
|                     # for now, our table key schema is not including | ||||
|                     # the dst[/src] source asset token. | ||||
|  |  | |||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							|  | @ -50,8 +50,10 @@ attrs = "^23.1.0" | |||
| bidict = "^0.22.1" | ||||
| colorama = "^0.4.6" | ||||
| colorlog = "^6.7.0" | ||||
| cython = "^3.0.0" | ||||
| greenback = "^1.1.1" | ||||
| ib-insync = "^0.9.86" | ||||
| msgspec = "^0.18.6" | ||||
| msgspec = "^0.18.0" | ||||
| numba = "^0.59.0" | ||||
| numpy = "^1.25" | ||||
| polars = "^0.18.13" | ||||
|  | @ -69,13 +71,11 @@ pdbp = "^1.5.0" | |||
| trio = "^0.24" | ||||
| pendulum = "^3.0.0" | ||||
| httpx = "^0.27.0" | ||||
| cryptofeed = "^2.4.0" | ||||
| pyarrow = "^17.0.0" | ||||
| 
 | ||||
| [tool.poetry.dependencies.tractor] | ||||
| develop = true | ||||
| git = 'https://pikers.dev/goodboy/tractor.git' | ||||
| branch = 'aio_abandons' | ||||
| git = 'https://github.com/goodboy/tractor.git' | ||||
| branch = 'asyncio_debugger_support' | ||||
| # path = "../tractor" | ||||
| 
 | ||||
| [tool.poetry.dependencies.asyncvnc] | ||||
|  | @ -109,8 +109,6 @@ pytest = "^6.0.0" | |||
| elasticsearch = "^8.9.0" | ||||
| xonsh = "^0.14.2" | ||||
| prompt-toolkit = "3.0.40" | ||||
| cython = "^3.0.0" | ||||
| greenback = "^1.1.1" | ||||
| 
 | ||||
| # console ehancements and eventually remote debugging | ||||
| # extras/helpers. | ||||
|  |  | |||
|  | @ -10,7 +10,7 @@ from piker import ( | |||
|     config, | ||||
| ) | ||||
| from piker.service import ( | ||||
|     get_service_mngr, | ||||
|     Services, | ||||
| ) | ||||
| from piker.log import get_console_log | ||||
| 
 | ||||
|  | @ -129,7 +129,7 @@ async def _open_test_pikerd( | |||
|         ) as service_manager, | ||||
|     ): | ||||
|         # this proc/actor is the pikerd | ||||
|         assert service_manager is get_service_mngr() | ||||
|         assert service_manager is Services | ||||
| 
 | ||||
|         async with tractor.wait_for_actor( | ||||
|             'pikerd', | ||||
|  |  | |||
|  | @ -26,7 +26,7 @@ import pytest | |||
| import tractor | ||||
| from uuid import uuid4 | ||||
| 
 | ||||
| from piker.service import ServiceMngr | ||||
| from piker.service import Services | ||||
| from piker.log import get_logger | ||||
| from piker.clearing._messages import ( | ||||
|     Order, | ||||
|  | @ -158,7 +158,7 @@ def load_and_check_pos( | |||
| 
 | ||||
| 
 | ||||
| def test_ems_err_on_bad_broker( | ||||
|     open_test_pikerd: ServiceMngr, | ||||
|     open_test_pikerd: Services, | ||||
|     loglevel: str, | ||||
| ): | ||||
|     async def load_bad_fqme(): | ||||
|  |  | |||
|  | @ -15,7 +15,7 @@ import tractor | |||
| 
 | ||||
| from piker.service import ( | ||||
|     find_service, | ||||
|     ServiceMngr, | ||||
|     Services, | ||||
| ) | ||||
| from piker.data import ( | ||||
|     open_feed, | ||||
|  | @ -44,7 +44,7 @@ def test_runtime_boot( | |||
|     async def main(): | ||||
|         port = 6666 | ||||
|         daemon_addr = ('127.0.0.1', port) | ||||
|         services: ServiceMngr | ||||
|         services: Services | ||||
| 
 | ||||
|         async with ( | ||||
|             open_test_pikerd( | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue