diff --git a/piker/accounting/__init__.py b/piker/accounting/__init__.py index 72b883df..7a587261 100644 --- a/piker/accounting/__init__.py +++ b/piker/accounting/__init__.py @@ -19,8 +19,10 @@ for tendiez. ''' -from ..log import get_logger - +from piker.log import ( + get_console_log, + get_logger, +) from .calc import ( iter_by_dt, ) @@ -51,7 +53,17 @@ from ._allocate import ( log = get_logger(__name__) +# ?TODO, enable console on import +# [ ] necessary? or `open_brokerd_dialog()` doing it is sufficient? +# +# bc might as well enable whenev imported by +# other sub-sys code (namely `.clearing`). +get_console_log( + level='warning', + name=__name__, +) +# TODO, the `as ` style? __all__ = [ 'Account', 'Allocator', diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 64c56ba1..e4577b47 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -60,12 +60,16 @@ from ..clearing._messages import ( BrokerdPosition, ) from piker.types import Struct -from piker.log import get_logger +from piker.log import ( + get_logger, +) if TYPE_CHECKING: from piker.data._symcache import SymbologyCache -log = get_logger(__name__) +log = get_logger( + name=__name__, +) class Position(Struct): diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index f68cdfca..10306898 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -21,7 +21,6 @@ CLI front end for trades ledger and position tracking management. from __future__ import annotations from pprint import pformat - from rich.console import Console from rich.markdown import Markdown import polars as pl @@ -29,7 +28,10 @@ import tractor import trio import typer -from ..log import get_logger +from piker.log import ( + get_console_log, + get_logger, +) from ..service import ( open_piker_runtime, ) @@ -45,6 +47,7 @@ from .calc import ( open_ledger_dfs, ) +log = get_logger(name=__name__) ledger = typer.Typer() @@ -79,7 +82,10 @@ def sync( "-l", ), ): - log = get_logger(loglevel) + log = get_console_log( + level=loglevel, + name=__name__, + ) console = Console() pair: tuple[str, str] diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 94e4cbe1..b64231a5 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -25,15 +25,16 @@ from types import ModuleType from tractor.trionics import maybe_open_context +from piker.log import ( + get_logger, +) from ._util import ( - log, BrokerError, SymbolNotFound, NoData, DataUnavailable, DataThrottle, resproc, - get_logger, ) __all__: list[str] = [ @@ -43,7 +44,6 @@ __all__: list[str] = [ 'DataUnavailable', 'DataThrottle', 'resproc', - 'get_logger', ] __brokers__: list[str] = [ @@ -65,6 +65,10 @@ __brokers__: list[str] = [ # bitso ] +log = get_logger( + name=__name__, +) + def get_brokermod(brokername: str) -> ModuleType: ''' diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5414bfb9..7c251568 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -33,12 +33,18 @@ import exceptiongroup as eg import tractor import trio +from piker.log import ( + get_logger, + get_console_log, +) from . import _util from . import get_brokermod if TYPE_CHECKING: from ..data import _FeedsBus +log = get_logger(name=__name__) + # `brokerd` enabled modules # TODO: move this def to the `.data` subpkg.. # NOTE: keeping this list as small as possible is part of our caps-sec @@ -59,7 +65,7 @@ _data_mods: str = [ async def _setup_persistent_brokerd( ctx: tractor.Context, brokername: str, - loglevel: str | None = None, + loglevel: str|None = None, ) -> None: ''' @@ -72,13 +78,14 @@ async def _setup_persistent_brokerd( # since all hosted daemon tasks will reference this same # log instance's (actor local) state and thus don't require # any further (level) configuration on their own B) - log = _util.get_console_log( - loglevel or tractor.current_actor().loglevel, + actor: tractor.Actor = tractor.current_actor() + tll: str = actor.loglevel + log = get_console_log( + level=loglevel or tll, name=f'{_util.subsys}.{brokername}', + with_tractor_log=bool(tll), ) - - # set global for this actor to this new process-wide instance B) - _util.log = log + assert log.name == _util.subsys # further, set the log level on any broker broker specific # logger instance. @@ -97,7 +104,7 @@ async def _setup_persistent_brokerd( # NOTE: see ep invocation details inside `.data.feed`. try: async with ( - tractor.trionics.collapse_eg(), + # tractor.trionics.collapse_eg(), trio.open_nursery() as service_nursery ): bus: _FeedsBus = feed.get_feed_bus( @@ -193,7 +200,6 @@ def broker_init( async def spawn_brokerd( - brokername: str, loglevel: str | None = None, @@ -201,8 +207,10 @@ async def spawn_brokerd( ) -> bool: - from piker.service._util import log # use service mngr log - log.info(f'Spawning {brokername} broker daemon') + log.info( + f'Spawning broker-daemon,\n' + f'backend: {brokername!r}' + ) ( brokermode, @@ -249,7 +257,7 @@ async def spawn_brokerd( async def maybe_spawn_brokerd( brokername: str, - loglevel: str | None = None, + loglevel: str|None = None, **pikerd_kwargs, @@ -265,8 +273,7 @@ async def maybe_spawn_brokerd( from piker.service import maybe_spawn_daemon async with maybe_spawn_daemon( - - f'brokerd.{brokername}', + service_name=f'brokerd.{brokername}', service_task_target=spawn_brokerd, spawn_args={ 'brokername': brokername, diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 3588a87a..47b10ad0 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -19,15 +19,13 @@ Handy cross-broker utils. """ from __future__ import annotations -from functools import partial +# from functools import partial import json import httpx import logging -from ..log import ( - get_logger, - get_console_log, +from piker.log import ( colorize_json, ) subsys: str = 'piker.brokers' @@ -35,12 +33,22 @@ subsys: str = 'piker.brokers' # NOTE: level should be reset by any actor that is spawned # as well as given a (more) explicit name/key such # as `piker.brokers.binance` matching the subpkg. -log = get_logger(subsys) +# log = get_logger(subsys) -get_console_log = partial( - get_console_log, - name=subsys, -) +# ?TODO?? we could use this approach, but we need to be able +# to pass multiple `name=` values so for example we can include the +# emissions in `.accounting._pos` and others! +# [ ] maybe we could do the `log = get_logger()` above, +# then cycle through the list of subsys mods we depend on +# and then get all their loggers and pass them to +# `get_console_log(logger=)`?? +# [ ] OR just write THIS `get_console_log()` as a hook which does +# that based on who calls it?.. i dunno +# +# get_console_log = partial( +# get_console_log, +# name=subsys, +# ) class BrokerError(Exception): diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 919e8152..13eebea0 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -37,8 +37,9 @@ import trio from piker.accounting import ( Asset, ) -from piker.brokers._util import ( +from piker.log import ( get_logger, + get_console_log, ) from piker.data._web_bs import ( open_autorecon_ws, @@ -69,7 +70,9 @@ from .venues import ( ) from .api import Client -log = get_logger('piker.brokers.binance') +log = get_logger( + name=__name__, +) # Fee schedule template, mostly for paper engine fees modelling. @@ -245,9 +248,16 @@ async def handle_order_requests( @tractor.context async def open_trade_dialog( ctx: tractor.Context, + loglevel: str = 'warning', ) -> AsyncIterator[dict[str, Any]]: + # enable piker.clearing console log for *this* `brokerd` subactor + get_console_log( + level=loglevel, + name=__name__, + ) + # TODO: how do we set this from the EMS such that # positions are loaded from the correct venue on the user # stream at startup? (that is in an attempt to support both diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index ff5f6ec0..ba02634e 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -64,9 +64,9 @@ from piker.data._web_bs import ( open_autorecon_ws, NoBsWs, ) +from piker.log import get_logger from piker.brokers._util import ( DataUnavailable, - get_logger, ) from .api import ( @@ -78,7 +78,7 @@ from .venues import ( get_api_eps, ) -log = get_logger('piker.brokers.binance') +log = get_logger(name=__name__) class L1(Struct): diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 626b4ff8..45c5c41c 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -27,14 +27,12 @@ import click import trio import tractor -from ..cli import cli -from .. import watchlists as wl -from ..log import ( +from piker.cli import cli +from piker import watchlists as wl +from piker.log import ( colorize_json, -) -from ._util import ( - log, get_console_log, + get_logger, ) from ..service import ( maybe_spawn_brokerd, @@ -45,12 +43,15 @@ from ..brokers import ( get_brokermod, data, ) -DEFAULT_BROKER = 'binance' +log = get_logger( + name=__name__, +) + +DEFAULT_BROKER = 'binance' _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') - OK = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' @@ -345,7 +346,10 @@ def contracts(ctx, loglevel, broker, symbol, ids): ''' brokermod = get_brokermod(broker) - get_console_log(loglevel) + get_console_log( + level=loglevel, + name=__name__, + ) contracts = trio.run(partial(core.contracts, brokermod, symbol)) if not ids: @@ -477,11 +481,12 @@ def search( # the `piker --pdb` XD .. # -[ ] pull from the parent click ctx's values..dumdum # assert pdb + loglevel: str = config['loglevel'] # define tractor entrypoint async def main(func): async with maybe_open_pikerd( - loglevel=config['loglevel'], + loglevel=loglevel, debug_mode=pdb, ): return await func() @@ -494,6 +499,7 @@ def search( core.symbol_search, brokermods, pattern, + loglevel=loglevel, ), ) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index c1aa88ac..60623f85 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -28,12 +28,14 @@ from typing import ( import trio -from ._util import log +from piker.log import get_logger from . import get_brokermod from ..service import maybe_spawn_brokerd from . import open_cached_client from ..accounting import MktPair +log = get_logger(name=__name__) + async def api(brokername: str, methname: str, **kwargs) -> dict: ''' @@ -147,6 +149,7 @@ async def search_w_brokerd( async def symbol_search( brokermods: list[ModuleType], pattern: str, + loglevel: str = 'warning', **kwargs, ) -> dict[str, dict[str, dict[str, Any]]]: @@ -176,6 +179,7 @@ async def symbol_search( '_infect_asyncio', False, ), + loglevel=loglevel ) as portal: results.append(( diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 6d178b51..12010ede 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -41,12 +41,15 @@ import tractor from tractor.experimental import msgpub from async_generator import asynccontextmanager -from ._util import ( - log, +from piker.log import( + get_logger, get_console_log, ) from . import get_brokermod +log = get_logger( + name='piker.brokers.binance', +) async def wait_for_network( net_func: Callable, @@ -243,7 +246,10 @@ async def start_quote_stream( ''' # XXX: why do we need this again? - get_console_log(tractor.current_actor().loglevel) + get_console_log( + level=tractor.current_actor().loglevel, + name=__name__, + ) # pull global vars from local actor symbols = list(symbols) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 5ecd4e55..10b3dde1 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -34,13 +34,13 @@ import subprocess import tractor -from piker.brokers._util import get_logger +from piker.log import get_logger if TYPE_CHECKING: from .api import Client import i3ipc -log = get_logger('piker.brokers.ib') +log = get_logger(name=__name__) _reset_tech: Literal[ 'vnc', diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 1e9d9c1b..e8514958 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -50,6 +50,10 @@ from ib_insync.objects import ( ) from piker import config +from piker.log import ( + get_logger, + get_console_log, +) from piker.types import Struct from piker.accounting import ( Position, @@ -77,7 +81,6 @@ from piker.clearing._messages import ( BrokerdFill, BrokerdError, ) -from ._util import log from .api import ( _accounts2clients, get_config, @@ -95,6 +98,10 @@ from .ledger import ( update_ledger_from_api_trades, ) +log = get_logger( + name=__name__, +) + def pack_position( pos: IbPosition, @@ -536,9 +543,15 @@ class IbAcnt(Struct): @tractor.context async def open_trade_dialog( ctx: tractor.Context, + loglevel: str = 'warning', ) -> AsyncIterator[dict[str, Any]]: + get_console_log( + level=loglevel, + name=__name__, + ) + # task local msg dialog tracking flows = OrderDialogs() accounts_def = config.load_accounts(['ib']) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 28054da4..6cb46ce6 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -56,11 +56,11 @@ from piker.brokers._util import ( NoData, DataUnavailable, ) +from piker.log import get_logger from .api import ( # _adhoc_futes_set, Client, con2fqme, - log, load_aio_clients, MethodProxy, open_client_proxies, @@ -78,6 +78,9 @@ from .symbols import get_mkt_info if TYPE_CHECKING: from trio._core._run import Task +log = get_logger( + name=__name__, +) # XXX NOTE: See available types table docs: # https://interactivebrokers.github.io/tws-api/tick_types.html diff --git a/piker/brokers/ib/ledger.py b/piker/brokers/ib/ledger.py index d62b4ba7..dc23748d 100644 --- a/piker/brokers/ib/ledger.py +++ b/piker/brokers/ib/ledger.py @@ -44,6 +44,7 @@ from ib_insync import ( CommissionReport, ) +from piker.log import get_logger from piker.types import Struct from piker.data import ( SymbologyCache, @@ -57,7 +58,6 @@ from piker.accounting import ( iter_by_dt, ) from ._flex_reports import parse_flex_dt -from ._util import log if TYPE_CHECKING: from .api import ( @@ -65,6 +65,9 @@ if TYPE_CHECKING: MethodProxy, ) +log = get_logger( + name=__name__, +) tx_sort: Callable = partial( iter_by_dt, diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index 04ec74e4..6e208fe6 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -42,10 +42,7 @@ from piker.accounting import ( from piker._cacheables import ( async_lifo_cache, ) - -from ._util import ( - log, -) +from piker.log import get_logger if TYPE_CHECKING: from .api import ( @@ -53,6 +50,10 @@ if TYPE_CHECKING: Client, ) +log = get_logger( + name=__name__, +) + _futes_venues = ( 'GLOBEX', 'NYMEX', diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 0aaf5730..c2bdada4 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -62,9 +62,12 @@ from piker.clearing._messages import ( from piker.brokers import ( open_cached_client, ) +from piker.log import ( + get_console_log, + get_logger, +) from piker.data import open_symcache from .api import ( - log, Client, BrokerError, ) @@ -78,6 +81,8 @@ from .ledger import ( verify_balances, ) +log = get_logger(name=__name__) + MsgUnion = Union[ BrokerdCancel, BrokerdError, @@ -431,9 +436,15 @@ def trades2pps( @tractor.context async def open_trade_dialog( ctx: tractor.Context, + loglevel: str = 'warning', ) -> AsyncIterator[dict[str, Any]]: + get_console_log( + level=loglevel, + name=__name__, + ) + async with ( # TODO: maybe bind these together and deliver # a tuple from `.open_cached_client()`? diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 97ef5a3a..69c888cb 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -50,13 +50,19 @@ from . import open_cached_client from piker._cacheables import async_lifo_cache from .. import config from ._util import resproc, BrokerError, SymbolNotFound -from ..log import ( +from piker.log import ( colorize_json, -) -from ._util import ( - log, get_console_log, ) +from piker.log import ( + get_logger, +) + + +log = get_logger( + name=__name__, +) + _use_practice_account = False _refresh_token_ep = 'https://{}login.questrade.com/oauth2/' @@ -1205,7 +1211,10 @@ async def stream_quotes( # feed_type: str = 'stock', ) -> AsyncGenerator[str, Dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel) + get_console_log( + level=loglevel, + name=__name__, + ) async with open_cached_client('questrade') as client: if feed_type == 'stock': diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 8fc5739f..e3ac75ea 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -30,9 +30,16 @@ import asks from ._util import ( resproc, BrokerError, - log, ) -from ..calc import percent_change +from piker.calc import percent_change +from piker.log import ( + get_logger, +) + +log = get_logger( + name=__name__, +) + _service_ep = 'https://api.robinhood.com' diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 21edcbb7..d6c4caa2 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -215,7 +215,7 @@ async def relay_orders_from_sync_code( async def open_ems( fqme: str, mode: str = 'live', - loglevel: str = 'error', + loglevel: str = 'warning', ) -> tuple[ OrderClient, # client diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index a5939eb7..782351c0 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -47,6 +47,7 @@ from tractor import trionics from ._util import ( log, # sub-sys logger get_console_log, + subsys, ) from ..accounting._mktinfo import ( unpack_fqme, @@ -136,7 +137,7 @@ class DarkBook(Struct): tuple[ Callable[[float], bool], # predicate tuple[str, ...], # tickfilter - dict | Order, # cmd / msg type + dict|Order, # cmd / msg type # live submission constraint parameters float, # percent_away max price diff @@ -278,7 +279,7 @@ async def clear_dark_triggers( # remove exec-condition from set log.info(f'Removing trigger for {oid}') - trigger: tuple | None = execs.pop(oid, None) + trigger: tuple|None = execs.pop(oid, None) if not trigger: log.warning( f'trigger for {oid} was already removed!?' @@ -336,8 +337,8 @@ async def open_brokerd_dialog( brokermod: ModuleType, portal: tractor.Portal, exec_mode: str, - fqme: str | None = None, - loglevel: str | None = None, + fqme: str|None = None, + loglevel: str|None = None, ) -> tuple[ tractor.MsgStream, @@ -351,9 +352,21 @@ async def open_brokerd_dialog( broker backend, configuration, or client code usage. ''' + get_console_log( + level=loglevel, + name='clearing', + ) + # enable `.accounting` console since normally used by + # each `brokerd`. + get_console_log( + level=loglevel, + name='piker.accounting', + ) broker: str = brokermod.name - def mk_paper_ep(): + def mk_paper_ep( + loglevel: str, + ): from . import _paper_engine as paper_mod nonlocal brokermod, exec_mode @@ -405,17 +418,21 @@ async def open_brokerd_dialog( if ( trades_endpoint is not None - or exec_mode != 'paper' + or + exec_mode != 'paper' ): # open live brokerd trades endpoint open_trades_endpoint = portal.open_context( trades_endpoint, + loglevel=loglevel, ) @acm async def maybe_open_paper_ep(): if exec_mode == 'paper': - async with mk_paper_ep() as msg: + async with mk_paper_ep( + loglevel=loglevel, + ) as msg: yield msg return @@ -426,7 +443,9 @@ async def open_brokerd_dialog( # runtime indication that the backend can't support live # order ctrl yet, so boot the paperboi B0 if first == 'paper': - async with mk_paper_ep() as msg: + async with mk_paper_ep( + loglevel=loglevel, + ) as msg: yield msg return else: @@ -761,12 +780,16 @@ _router: Router = None @tractor.context async def _setup_persistent_emsd( ctx: tractor.Context, - loglevel: str | None = None, + loglevel: str|None = None, ) -> None: if loglevel: - get_console_log(loglevel) + _log = get_console_log( + level=loglevel, + name=subsys, + ) + assert _log.name == 'piker.clearing' global _router @@ -822,7 +845,7 @@ async def translate_and_relay_brokerd_events( f'Rx brokerd trade msg:\n' f'{fmsg}' ) - status_msg: Status | None = None + status_msg: Status|None = None match brokerd_msg: # BrokerdPosition @@ -1283,7 +1306,7 @@ async def process_client_order_cmds( and status.resp == 'dark_open' ): # remove from dark book clearing - entry: tuple | None = dark_book.triggers[fqme].pop(oid, None) + entry: tuple|None = dark_book.triggers[fqme].pop(oid, None) if entry: ( pred, diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index e9daf1a5..9b49508e 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -59,9 +59,9 @@ from piker.data import ( open_symcache, ) from piker.types import Struct -from ._util import ( - log, # sub-sys logger +from piker.log import ( get_console_log, + get_logger, ) from ._messages import ( BrokerdCancel, @@ -73,6 +73,8 @@ from ._messages import ( BrokerdError, ) +log = get_logger(name=__name__) + class PaperBoi(Struct): ''' @@ -550,16 +552,18 @@ _sells: defaultdict[ @tractor.context async def open_trade_dialog( - ctx: tractor.Context, broker: str, - fqme: str | None = None, # if empty, we only boot broker mode + fqme: str|None = None, # if empty, we only boot broker mode loglevel: str = 'warning', ) -> None: - # enable piker.clearing console log for *this* subactor - get_console_log(loglevel) + # enable piker.clearing console log for *this* `brokerd` subactor + get_console_log( + level=loglevel, + name=__name__, + ) symcache: SymbologyCache async with open_symcache(get_brokermod(broker)) as symcache: diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index c82a01aa..67c501f7 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -28,12 +28,14 @@ from ..log import ( from piker.types import Struct subsys: str = 'piker.clearing' -log = get_logger(subsys) +log = get_logger( + name='piker.clearing', +) # TODO, oof doesn't this ignore the `loglevel` then??? get_console_log = partial( get_console_log, - name=subsys, + name='clearing', ) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index fdecb818..f469242e 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -61,7 +61,8 @@ def load_trans_eps( if ( network - and not maddrs + and + not maddrs ): # load network section and (attempt to) connect all endpoints # which are reachable B) @@ -112,31 +113,27 @@ def load_trans_eps( default=None, help='Multiaddrs to bind or contact', ) -# @click.option( -# '--tsdb', -# is_flag=True, -# help='Enable local ``marketstore`` instance' -# ) -# @click.option( -# '--es', -# is_flag=True, -# help='Enable local ``elasticsearch`` instance' -# ) def pikerd( maddr: list[str] | None, loglevel: str, tl: bool, pdb: bool, - # tsdb: bool, - # es: bool, ): ''' - Spawn the piker broker-daemon. + Start the "root service actor", `pikerd`, run it until + cancellation. + + This "root daemon" operates as the top most service-mngr and + subsys-as-subactor supervisor, think of it as the "init proc" of + any of any `piker` application or daemon-process tree. ''' # from tractor.devx import maybe_open_crash_handler # with maybe_open_crash_handler(pdb=False): - log = get_console_log(loglevel, name='cli') + log = get_console_log( + level=loglevel, + with_tractor_log=tl, + ) if pdb: log.warning(( @@ -237,6 +234,14 @@ def cli( regaddr: str, ) -> None: + ''' + The "root" `piker`-cmd CLI endpoint. + + NOTE, this def generally relies on and requires a sub-cmd to be + provided by the user, OW only a `--help` msg (listing said + subcmds) will be dumped to console. + + ''' if configdir is not None: assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" config._override_config_dir(configdir) @@ -295,17 +300,50 @@ def cli( @click.option('--tl', is_flag=True, help='Enable tractor logging') @click.argument('ports', nargs=-1, required=False) @click.pass_obj -def services(config, tl, ports): +def services( + config, + tl: bool, + ports: list[int], +): + ''' + List all `piker` "service deamons" to the console in + a `json`-table which maps each actor's UID in the form, - from ..service import ( + `{service_name}.{subservice_name}.{UUID}` + + to its (primary) IPC server address. + + (^TODO, should be its multiaddr form once we support it) + + Note that by convention actors which operate as "headless" + processes (those without GUIs/graphics, and which generally + parent some noteworthy subsystem) are normally suffixed by + a "d" such as, + + - pikerd: the root runtime supervisor + - brokerd: a broker-backend order ctl daemon + - emsd: the internal dark-clearing and order routing daemon + - datad: a data-provider-backend data feed daemon + - samplerd: the real-time data sampling and clock-syncing daemon + + "Headed units" are normally just given an obvious app-like name + with subactors indexed by `.` such as, + - chart: the primary modal charting iface, a Qt app + - chart.fsp_0: a financial-sig-proc cascade instance which + delivers graphics to a parent `chart` app. + - polars_boi: some (presumably) `polars` using console app. + + ''' + from piker.service import ( open_piker_runtime, _default_registry_port, _default_registry_host, ) - host = _default_registry_host + # !TODO, mk this to work with UDS! + host: str = _default_registry_host if not ports: - ports = [_default_registry_port] + ports: list[int] = [_default_registry_port] addr = tractor._addr.wrap_address( addr=(host, ports[0]) @@ -316,7 +354,11 @@ def services(config, tl, ports): async with ( open_piker_runtime( name='service_query', - loglevel=config['loglevel'] if tl else None, + loglevel=( + config['loglevel'] + if tl + else None + ), ), tractor.get_registry( addr=addr, @@ -336,7 +378,15 @@ def services(config, tl, ports): def _load_clis() -> None: - # from ..service import elastic # noqa + ''' + Dynamically load and register all subsys CLI endpoints (at call + time). + + NOTE, obviously this is normally expected to be called at + `import` time and implicitly relies on our use of various + `click`/`typer` decorator APIs. + + ''' from ..brokers import cli # noqa from ..ui import cli # noqa from ..watchlists import cli # noqa @@ -346,5 +396,5 @@ def _load_clis() -> None: from ..accounting import cli # noqa -# load downstream cli modules +# load all subsytem cli eps _load_clis() diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f8a0ec27..74ecf114 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -336,10 +336,18 @@ async def register_with_sampler( open_index_stream: bool = True, # open a 2way stream for sample step msgs? sub_for_broadcasts: bool = True, # sampler side to send step updates? + loglevel: str|None = None, ) -> set[int]: - get_console_log(tractor.current_actor().loglevel) + get_console_log( + level=( + loglevel + or + tractor.current_actor().loglevel + ), + name=__name__, + ) incr_was_started: bool = False try: @@ -476,6 +484,7 @@ async def spawn_samplerd( register_with_sampler, period_s=1, sub_for_broadcasts=False, + loglevel=loglevel, ) return True @@ -484,7 +493,6 @@ async def spawn_samplerd( @acm async def maybe_open_samplerd( - loglevel: str|None = None, **pikerd_kwargs, @@ -513,10 +521,10 @@ async def open_sample_stream( shms_by_period: dict[float, dict]|None = None, open_index_stream: bool = True, sub_for_broadcasts: bool = True, + loglevel: str|None = None, - cache_key: str|None = None, - allow_new_sampler: bool = True, - + # cache_key: str|None = None, + # allow_new_sampler: bool = True, ensure_is_active: bool = False, ) -> AsyncIterator[dict[str, float]]: @@ -551,7 +559,9 @@ async def open_sample_stream( # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes - maybe_open_samplerd() as portal, + maybe_open_samplerd( + loglevel=loglevel, + ) as portal, portal.open_context( register_with_sampler, @@ -560,6 +570,7 @@ async def open_sample_stream( 'shms_by_period': shms_by_period, 'open_index_stream': open_index_stream, 'sub_for_broadcasts': sub_for_broadcasts, + 'loglevel': loglevel, }, ) as (ctx, shm_periods) ): diff --git a/piker/data/_util.py b/piker/data/_util.py index 8c78255f..a1aafcd0 100644 --- a/piker/data/_util.py +++ b/piker/data/_util.py @@ -26,7 +26,9 @@ from ..log import ( ) subsys: str = 'piker.data' -log = get_logger(subsys) +log = get_logger( + name=subsys, +) get_console_log = partial( get_console_log, diff --git a/piker/data/feed.py b/piker/data/feed.py index 9cc37cd7..50955801 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -62,7 +62,6 @@ from ._util import ( log, get_console_log, ) -from .flows import Flume from .validate import ( FeedInit, validate_backend, @@ -77,6 +76,7 @@ from ._sampling import ( ) if TYPE_CHECKING: + from .flows import Flume from tractor._addr import Address from tractor.msg.types import Aid @@ -239,7 +239,6 @@ async def allocate_persistent_feed( brokername: str, symstr: str, - loglevel: str, start_stream: bool = True, init_timeout: float = 616, @@ -278,7 +277,7 @@ async def allocate_persistent_feed( # ``stream_quotes()``, a required broker backend endpoint. init_msgs: ( list[FeedInit] # new - | dict[str, dict[str, str]] # legacy / deprecated + |dict[str, dict[str, str]] # legacy / deprecated ) # TODO: probably make a struct msg type for this as well @@ -348,11 +347,14 @@ async def allocate_persistent_feed( izero_rt, rt_shm, ) = await bus.nursery.start( - manage_history, - mod, - mkt, - some_data_ready, - feed_is_live, + partial( + manage_history, + mod=mod, + mkt=mkt, + some_data_ready=some_data_ready, + feed_is_live=feed_is_live, + loglevel=loglevel, + ) ) # yield back control to starting nursery once we receive either @@ -362,6 +364,8 @@ async def allocate_persistent_feed( ) await some_data_ready.wait() + # XXX, avoid cycle; it imports this mod. + from .flows import Flume flume = Flume( # TODO: we have to use this for now since currently the @@ -458,7 +462,6 @@ async def allocate_persistent_feed( @tractor.context async def open_feed_bus( - ctx: tractor.Context, brokername: str, symbols: list[str], # normally expected to the broker-specific fqme @@ -479,13 +482,16 @@ async def open_feed_bus( ''' if loglevel is None: - loglevel = tractor.current_actor().loglevel + loglevel: str = tractor.current_actor().loglevel # XXX: required to propagate ``tractor`` loglevel to piker # logging get_console_log( - loglevel - or tractor.current_actor().loglevel + level=(loglevel + or + tractor.current_actor().loglevel + ), + name=__name__, ) # local state sanity checks @@ -500,7 +506,6 @@ async def open_feed_bus( sub_registered = trio.Event() flumes: dict[str, Flume] = {} - for symbol in symbols: # if no cached feed for this symbol has been created for this @@ -684,6 +689,7 @@ class Feed(Struct): ''' mods: dict[str, ModuleType] = {} portals: dict[ModuleType, tractor.Portal] = {} + flumes: dict[ str, # FQME Flume, @@ -797,7 +803,7 @@ async def install_brokerd_search( @acm async def maybe_open_feed( fqmes: list[str], - loglevel: str | None = None, + loglevel: str|None = None, **kwargs, @@ -881,7 +887,6 @@ async def open_feed( # one actor per brokerd for now brokerd_ctxs = [] - for brokermod, bfqmes in providers.items(): # if no `brokerd` for this backend exists yet we spawn @@ -951,6 +956,8 @@ async def open_feed( assert len(feed.mods) == len(feed.portals) + # XXX, avoid cycle; it imports this mod. + from .flows import Flume async with ( trionics.gather_contexts(bus_ctxs) as ctxs, ): diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 5d1fd45a..b7806719 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -24,6 +24,7 @@ from functools import partial from typing import ( AsyncIterator, Callable, + TYPE_CHECKING, ) import numpy as np @@ -33,12 +34,12 @@ import tractor from tractor.msg import NamespacePath from piker.types import Struct -from ..log import get_logger, get_console_log -from .. import data -from ..data.feed import ( - Flume, - Feed, +from ..log import ( + get_logger, + get_console_log, ) +from .. import data +from ..data.flows import Flume from ..data._sharedmem import ShmArray from ..data._sampling import ( _default_delay_s, @@ -52,6 +53,9 @@ from ._api import ( ) from ..toolz import Profiler +if TYPE_CHECKING: + from ..data.feed import Feed + log = get_logger(__name__) @@ -169,8 +173,10 @@ class Cascade(Struct): if not synced: fsp: Fsp = self.fsp log.warning( - '***DESYNCED FSP***\n' - f'{fsp.ns_path}@{src_shm.token}\n' + f'***DESYNCED fsp***\n' + f'------------------\n' + f'ns-path: {fsp.ns_path!r}\n' + f'shm-token: {src_shm.token}\n' f'step_diff: {step_diff}\n' f'len_diff: {len_diff}\n' ) @@ -398,7 +404,6 @@ async def connect_streams( @tractor.context async def cascade( - ctx: tractor.Context, # data feed key @@ -412,7 +417,7 @@ async def cascade( shm_registry: dict[str, _Token], zero_on_step: bool = False, - loglevel: str | None = None, + loglevel: str|None = None, ) -> None: ''' @@ -426,7 +431,17 @@ async def cascade( ) if loglevel: - get_console_log(loglevel) + log = get_console_log( + loglevel, + name=__name__, + ) + # XXX TODO! + # figure out why this writes a dict to, + # `tractor._state._runtime_vars['_root_mailbox']` + # XD .. wtf + # TODO, solve this as reported in, + # https://www.pikers.dev/pikers/piker/issues/70 + # await tractor.pause() src: Flume = Flume.from_msg(src_flume_addr) dst: Flume = Flume.from_msg( @@ -469,7 +484,8 @@ async def cascade( # open a data feed stream with requested broker feed: Feed async with data.feed.maybe_open_feed( - [fqme], + fqmes=[fqme], + loglevel=loglevel, # TODO throttle tick outputs from *this* daemon since # it'll emit tons of ticks due to the throttle only @@ -567,7 +583,8 @@ async def cascade( # on every step msg received from the global `samplerd` # service. async with open_sample_stream( - float(delay_s) + period_s=float(delay_s), + loglevel=loglevel, ) as istream: profiler(f'{func_name}: sample stream up') diff --git a/piker/log.py b/piker/log.py index dc5cfc59..0145debf 100644 --- a/piker/log.py +++ b/piker/log.py @@ -37,35 +37,84 @@ _proj_name: str = 'piker' def get_logger( - name: str = None, - + name: str|None = None, + **tractor_log_kwargs, ) -> logging.Logger: ''' - Return the package log or a sub-log for `name` if provided. + Return the package log or a sub-logger if a `name=` is provided, + which defaults to the calling module's pkg-namespace path. + + See `tractor.log.get_logger()` for details. ''' + pkg_name: str = _proj_name + if ( + name + and + pkg_name in name + ): + name: str = name.lstrip(f'{_proj_name}.') + return tractor.log.get_logger( name=name, - _root_name=_proj_name, + pkg_name=pkg_name, + **tractor_log_kwargs, ) def get_console_log( - level: str | None = None, - name: str | None = None, + level: str|None = None, + name: str|None = None, + pkg_name: str|None = None, + with_tractor_log: bool = False, + # ?TODO, support a "log-spec" style `str|dict[str, str]` which + # dictates both the sublogger-key and a level? + # -> see similar idea in `modden`'s usage. + **tractor_log_kwargs, ) -> logging.Logger: ''' - Get the package logger and enable a handler which writes to stderr. + Get the package logger and enable a handler which writes to + stderr. - Yeah yeah, i know we can use ``DictConfig``. You do it... + Yeah yeah, i know we can use `DictConfig`. + You do it.. Bp ''' + pkg_name: str = _proj_name + if ( + name + and + pkg_name in name + ): + name: str = name.lstrip(f'{_proj_name}.') + + tll: str|None = None + if ( + with_tractor_log is not False + ): + tll = level + + elif maybe_actor := tractor.current_actor( + err_on_no_runtime=False, + ): + tll = maybe_actor.loglevel + + if tll: + t_log = tractor.log.get_console_log( + level=tll, + name='tractor', # <- XXX, force root tractor log! + **tractor_log_kwargs, + ) + # TODO/ allow only enabling certain tractor sub-logs? + assert t_log.name == 'tractor' + return tractor.log.get_console_log( - level, + level=level, name=name, - _root_name=_proj_name, - ) # our root logger + pkg_name=pkg_name, + **tractor_log_kwargs, + ) def colorize_json( diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 33f23453..9991c027 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -21,7 +21,6 @@ from __future__ import annotations import os from typing import ( - Optional, Any, ClassVar, ) @@ -32,9 +31,12 @@ from contextlib import ( import tractor import trio -from ._util import ( +from piker.log import ( get_console_log, ) +from ._util import ( + subsys, +) from ._mngr import ( Services, ) @@ -59,7 +61,7 @@ async def open_piker_runtime( registry_addrs: list[tuple[str, int]] = [], enable_modules: list[str] = [], - loglevel: Optional[str] = None, + loglevel: str|None = None, # XXX NOTE XXX: you should pretty much never want debug mode # for data daemons when running in production. @@ -69,7 +71,7 @@ async def open_piker_runtime( # and spawn the service tree distributed per that. start_method: str = 'trio', - tractor_runtime_overrides: dict | None = None, + tractor_runtime_overrides: dict|None = None, **tractor_kwargs, ) -> tuple[ @@ -97,7 +99,8 @@ async def open_piker_runtime( # setting it as the root actor on localhost. registry_addrs = ( registry_addrs - or [_default_reg_addr] + or + [_default_reg_addr] ) if ems := tractor_kwargs.pop('enable_modules', None): @@ -163,8 +166,7 @@ _root_modules: list[str] = [ @acm async def open_pikerd( registry_addrs: list[tuple[str, int]], - - loglevel: str | None = None, + loglevel: str|None = None, # XXX: you should pretty much never want debug mode # for data daemons when running in production. @@ -192,7 +194,6 @@ async def open_pikerd( async with ( open_piker_runtime( - name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, @@ -273,7 +274,10 @@ async def maybe_open_pikerd( ''' if loglevel: - get_console_log(loglevel) + get_console_log( + name=subsys, + level=loglevel + ) # subtle, we must have the runtime up here or portal lookup will fail query_name = kwargs.pop( diff --git a/piker/service/_ahab.py b/piker/service/_ahab.py index 4cccf855..94b18c9d 100644 --- a/piker/service/_ahab.py +++ b/piker/service/_ahab.py @@ -49,13 +49,15 @@ from requests.exceptions import ( ReadTimeout, ) -from ._mngr import Services -from ._util import ( - log, # sub-sys logger +from piker.log import ( get_console_log, + get_logger, ) +from ._mngr import Services from .. import config +log = get_logger(name=__name__) + class DockerNotStarted(Exception): 'Prolly you dint start da daemon bruh' @@ -336,13 +338,16 @@ class Container: async def open_ahabd( ctx: tractor.Context, endpoint: str, # ns-pointer str-msg-type - loglevel: str | None = None, + loglevel: str = 'cancel', **ep_kwargs, ) -> None: - log = get_console_log(loglevel or 'cancel') + log = get_console_log( + level=loglevel, + name='piker.service', + ) async with open_docker() as client: diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 89d7f28d..7cd7cdd1 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -30,8 +30,9 @@ from contextlib import ( import tractor from trio.lowlevel import current_task -from ._util import ( - log, # sub-sys logger +from piker.log import ( + get_console_log, + get_logger, ) from ._mngr import ( Services, @@ -39,16 +40,17 @@ from ._mngr import ( from ._actor_runtime import maybe_open_pikerd from ._registry import find_service +log = get_logger(name=__name__) + @acm async def maybe_spawn_daemon( - service_name: str, service_task_target: Callable, spawn_args: dict[str, Any], - loglevel: str | None = None, + loglevel: str|None = None, singleton: bool = False, **pikerd_kwargs, @@ -66,6 +68,12 @@ async def maybe_spawn_daemon( clients. ''' + log = get_console_log( + level=loglevel, + name=__name__, + ) + assert log.name == 'piker.service' + # serialize access to this section to avoid # 2 or more tasks racing to create a daemon lock = Services.locks[service_name] @@ -152,8 +160,7 @@ async def maybe_spawn_daemon( async def spawn_emsd( - - loglevel: str | None = None, + loglevel: str|None = None, **extra_tractor_kwargs ) -> bool: @@ -190,9 +197,8 @@ async def spawn_emsd( @acm async def maybe_open_emsd( - brokername: str, - loglevel: str | None = None, + loglevel: str|None = None, **pikerd_kwargs, diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 726a34c8..38fc1a3e 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -34,9 +34,9 @@ from tractor import ( Portal, ) -from ._util import ( - log, # sub-sys logger -) +from piker.log import get_logger + +log = get_logger(name=__name__) # TODO: we need remote wrapping and a general soln: diff --git a/piker/service/_registry.py b/piker/service/_registry.py index 94ccbc68..e24539e2 100644 --- a/piker/service/_registry.py +++ b/piker/service/_registry.py @@ -27,15 +27,29 @@ from typing import ( ) import tractor -from tractor import Portal - -from ._util import ( - log, # sub-sys logger +from tractor import ( + msg, + Actor, + Portal, ) +from piker.log import get_logger + +log = get_logger(name=__name__) + +# TODO? default path-space for UDS registry? +# [ ] needs to be Xplatform tho! +# _default_registry_path: Path = ( +# Path(os.environ['XDG_RUNTIME_DIR']) +# /'piker' +# ) + _default_registry_host: str = '127.0.0.1' _default_registry_port: int = 6116 -_default_reg_addr: tuple[str, int] = ( +_default_reg_addr: tuple[ + str, + int, # |str TODO, once we support UDS, see above. +] = ( _default_registry_host, _default_registry_port, ) @@ -75,16 +89,22 @@ async def open_registry( ''' global _tractor_kwargs - actor = tractor.current_actor() - uid = actor.uid - preset_reg_addrs: list[tuple[str, int]] = Registry.addrs + actor: Actor = tractor.current_actor() + aid: msg.Aid = actor.aid + uid: tuple[str, str] = aid.uid + preset_reg_addrs: list[ + tuple[str, int] + ] = Registry.addrs if ( preset_reg_addrs - and addrs + and + addrs ): if preset_reg_addrs != addrs: # if any(addr in preset_reg_addrs for addr in addrs): - diff: set[tuple[str, int]] = set(preset_reg_addrs) - set(addrs) + diff: set[ + tuple[str, int] + ] = set(preset_reg_addrs) - set(addrs) if diff: log.warning( f'`{uid}` requested only subset of registrars: {addrs}\n' @@ -98,7 +118,6 @@ async def open_registry( ) was_set: bool = False - if ( not tractor.is_root_process() and @@ -115,16 +134,23 @@ async def open_registry( f"`{uid}` registry should already exist but doesn't?" ) - if ( - not Registry.addrs - ): + if not Registry.addrs: was_set = True - Registry.addrs = addrs or [_default_reg_addr] + Registry.addrs = ( + addrs + or + [_default_reg_addr] + ) # NOTE: only spot this seems currently used is inside # `.ui._exec` which is the (eventual qtloops) bootstrapping # with guest mode. - _tractor_kwargs['registry_addrs'] = Registry.addrs + reg_addrs: list[tuple[str, str|int]] = Registry.addrs + # !TODO, a struct-API to stringently allow this only in special + # cases? + # -> better would be to have some way to (atomically) rewrite + # and entire `RuntimeVars`?? ideas welcome obvi.. + _tractor_kwargs['registry_addrs'] = reg_addrs try: yield Registry.addrs @@ -149,7 +175,7 @@ async def find_service( | None ): # try: - reg_addrs: list[tuple[str, int]] + reg_addrs: list[tuple[str, int|str]] async with open_registry( addrs=( registry_addrs @@ -172,15 +198,13 @@ async def find_service( only_first=first_only, # if set only returns single ref ) as maybe_portals: if not maybe_portals: - # log.info( - print( + log.info( f'Could NOT find service {service_name!r} -> {maybe_portals!r}' ) yield None return - # log.info( - print( + log.info( f'Found service {service_name!r} -> {maybe_portals}' ) yield maybe_portals @@ -195,8 +219,7 @@ async def find_service( async def check_for_service( service_name: str, - -) -> None | tuple[str, int]: +) -> None|tuple[str, int]: ''' Service daemon "liveness" predicate. diff --git a/piker/service/_util.py b/piker/service/_util.py index bdf23dab..614e4868 100644 --- a/piker/service/_util.py +++ b/piker/service/_util.py @@ -14,20 +14,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ -Sub-sys module commons. +Sub-sys module commons (if any ?? Bp). """ -from functools import partial - -from ..log import ( - get_logger, - get_console_log, -) subsys: str = 'piker.service' -log = get_logger(subsys) - -get_console_log = partial( - get_console_log, - name=subsys, -) +# ?TODO, if we were going to keep a `get_console_log()` in here to be +# invoked at `import`-time, how do we dynamically hand in the +# `level=` value? seems too early in the runtime to be injected +# right? diff --git a/piker/service/elastic.py b/piker/service/elastic.py index 902f4fde..f79f5bb3 100644 --- a/piker/service/elastic.py +++ b/piker/service/elastic.py @@ -16,6 +16,7 @@ from __future__ import annotations from contextlib import asynccontextmanager as acm +from pprint import pformat from typing import ( Any, TYPE_CHECKING, @@ -26,12 +27,17 @@ import asks if TYPE_CHECKING: import docker from ._ahab import DockerContainer + from . import ( + Services, + ) -from ._util import log # sub-sys logger -from ._util import ( +from piker.log import ( get_console_log, + get_logger, ) +log = get_logger(name=__name__) + # container level config _config = { @@ -67,7 +73,10 @@ def start_elasticsearch( elastic ''' - get_console_log('info', name=__name__) + get_console_log( + level='info', + name=__name__, + ) dcntr: DockerContainer = client.containers.run( 'piker:elastic', diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index c9f49420..9cfc7a13 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -52,17 +52,18 @@ import pendulum # TODO: import this for specific error set expected by mkts client # import purerpc -from ..data.feed import maybe_open_feed +from piker.data.feed import maybe_open_feed from . import Services -from ._util import ( - log, # sub-sys logger +from piker.log import ( get_console_log, + get_logger, ) if TYPE_CHECKING: import docker from ._ahab import DockerContainer +log = get_logger(name=__name__) # ahabd-supervisor and container level config diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index 30cc6b59..46bee231 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -54,10 +54,10 @@ from ..log import ( # for "time series processing" subsys: str = 'piker.tsp' -log = get_logger(subsys) +log = get_logger(name=__name__) get_console_log = partial( get_console_log, - name=subsys, + name=subsys, # activate for subsys-pkg "downward" ) # NOTE: union type-defs to handle generic `numpy` and `polars` types diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index a47e8e9d..99261342 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -63,8 +63,10 @@ from ..data._sharedmem import ( maybe_open_shm_array, ShmArray, ) -from ..data._source import def_iohlcv_fields -from ..data._sampling import ( +from piker.data._source import ( + def_iohlcv_fields, +) +from piker.data._sampling import ( open_sample_stream, ) @@ -96,7 +98,9 @@ if TYPE_CHECKING: # from .feed import _FeedsBus -log = get_logger(__name__) +log = get_logger( + name=__name__, +) # `ShmArray` buffer sizing configuration: @@ -550,7 +554,7 @@ async def start_backfill( ) # ?TODO, check against venue closure hours # if/when provided by backend? - await tractor.pause() + # await tractor.pause() expected_dur: Interval = ( last_start_dt.subtract( @@ -1320,6 +1324,7 @@ async def manage_history( mkt: MktPair, some_data_ready: trio.Event, feed_is_live: trio.Event, + loglevel: str = 'warning', timeframe: float = 60, # in seconds wait_for_live_timeout: float = 0.5, @@ -1497,6 +1502,7 @@ async def manage_history( # data feed layer that needs to consume it). open_index_stream=True, sub_for_broadcasts=False, + loglevel=loglevel, ) as sample_stream: # register 1s and 1m buffers with the global diff --git a/piker/ui/_app.py b/piker/ui/_app.py index f078163d..e3fb436d 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -33,7 +33,10 @@ from . import _search from ..accounting import unpack_fqme from ..data._symcache import open_symcache from ..data.feed import install_brokerd_search -from ..log import get_logger +from ..log import ( + get_logger, + get_console_log, +) from ..service import maybe_spawn_brokerd from ._exec import run_qtractor @@ -87,6 +90,13 @@ async def _async_main( Provision the "main" widget with initial symbol data and root nursery. """ + # enable chart's console logging + if loglevel: + get_console_log( + level=loglevel, + name=__name__, + ) + # set as singleton _chart._godw = main_widget diff --git a/piker/ui/_cursor.py b/piker/ui/_cursor.py index 7675b2e0..393b58ef 100644 --- a/piker/ui/_cursor.py +++ b/piker/ui/_cursor.py @@ -413,9 +413,18 @@ class Cursor(pg.GraphicsObject): self, item: pg.GraphicsObject, ) -> None: - assert getattr(item, 'delete'), f"{item} must define a ``.delete()``" + assert getattr( + item, + 'delete', + ), f"{item} must define a ``.delete()``" self._hovered.add(item) + def is_hovered( + self, + item: pg.GraphicsObject, + ) -> bool: + return item in self._hovered + def add_plot( self, plot: ChartPlotWidget, # noqa diff --git a/piker/ui/_dataviz.py b/piker/ui/_dataviz.py index 36251e48..cc4529be 100644 --- a/piker/ui/_dataviz.py +++ b/piker/ui/_dataviz.py @@ -45,7 +45,7 @@ from piker.ui.qt import QLineF from ..data._sharedmem import ( ShmArray, ) -from ..data.feed import Flume +from ..data.flows import Flume from ..data._formatters import ( IncrementalFormatter, OHLCBarsFmtr, # Plain OHLC renderer diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 690bfb18..9cedcc63 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -21,6 +21,7 @@ this module ties together quote and computational (fsp) streams with graphics update methods via our custom ``pyqtgraph`` charting api. ''' +from functools import partial import itertools from math import floor import time @@ -208,6 +209,7 @@ class DisplayState(Struct): async def increment_history_view( # min_istream: tractor.MsgStream, ds: DisplayState, + loglevel: str = 'warning', ): hist_chart: ChartPlotWidget = ds.hist_chart hist_viz: Viz = ds.hist_viz @@ -229,7 +231,10 @@ async def increment_history_view( hist_viz.reset_graphics() # hist_viz.update_graphics(force_redraw=True) - async with open_sample_stream(1.) as min_istream: + async with open_sample_stream( + period_s=1., + loglevel=loglevel, + ) as min_istream: async for msg in min_istream: profiler = Profiler( @@ -310,7 +315,6 @@ async def increment_history_view( async def graphics_update_loop( - dss: dict[str, DisplayState], nurse: trio.Nursery, godwidget: GodWidget, @@ -319,6 +323,7 @@ async def graphics_update_loop( pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {}, vlm_charts: dict[str, ChartPlotWidget] = {}, + loglevel: str = 'warning', ) -> None: ''' @@ -462,9 +467,12 @@ async def graphics_update_loop( # }) nurse.start_soon( - increment_history_view, - # min_istream, - ds, + partial( + increment_history_view, + # min_istream, + ds=ds, + loglevel=loglevel, + ), ) await trio.sleep(0) @@ -511,14 +519,19 @@ async def graphics_update_loop( fast_chart.linked.isHidden() or not rt_pi.isVisible() ): - print(f'{fqme} skipping update for HIDDEN CHART') + log.debug( + f'{fqme} skipping update for HIDDEN CHART' + ) fast_chart.pause_all_feeds() continue ic = fast_chart.view._in_interact if ic: fast_chart.pause_all_feeds() - print(f'{fqme} PAUSING DURING INTERACTION') + log.debug( + f'Pausing chart updaates during interaction\n' + f'fqme: {fqme!r}' + ) await ic.wait() fast_chart.resume_all_feeds() @@ -1591,15 +1604,18 @@ async def display_symbol_data( # start update loop task dss: dict[str, DisplayState] = {} ln.start_soon( - graphics_update_loop, - dss, - ln, - godwidget, - feed, - # min_istream, + partial( + graphics_update_loop, + dss=dss, + nurse=ln, + godwidget=godwidget, + feed=feed, + # min_istream, - pis, - vlm_charts, + pis=pis, + vlm_charts=vlm_charts, + loglevel=loglevel, + ) ) # boot order-mode diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 3a1a80a5..7a2df5e6 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -183,13 +183,17 @@ async def open_fsp_sidepane( @acm async def open_fsp_actor_cluster( - names: list[str] = ['fsp_0', 'fsp_1'], + names: list[str] = [ + 'fsp_0', + 'fsp_1', + ], ) -> AsyncGenerator[ int, dict[str, tractor.Portal] ]: + # TODO! change to .experimental! from tractor._clustering import open_actor_cluster # profiler = Profiler( @@ -197,7 +201,7 @@ async def open_fsp_actor_cluster( # disabled=False # ) async with open_actor_cluster( - count=2, + count=len(names), names=names, modules=['piker.fsp._engine'], @@ -497,7 +501,8 @@ class FspAdmin: portal: tractor.Portal = ( self.cluster.get(worker_name) - or self.rr_next_portal() + or + self.rr_next_portal() ) # TODO: this should probably be turned into a diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 9bd48139..2e3107ac 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -43,6 +43,7 @@ from pyqtgraph import ( functions as fn, ) import numpy as np +import tractor import trio from piker.ui.qt import ( @@ -72,7 +73,10 @@ if TYPE_CHECKING: GodWidget, ) from ._dataviz import Viz - from .order_mode import OrderMode + from .order_mode import ( + OrderMode, + Dialog, + ) from ._display import DisplayState @@ -130,7 +134,12 @@ async def handle_viewmode_kb_inputs( async for kbmsg in recv_chan: event, etype, key, mods, text = kbmsg.to_tuple() - log.debug(f'key: {key}, mods: {mods}, text: {text}') + log.debug( + f'View-mode kb-msg received,\n' + f'mods: {mods!r}\n' + f'key: {key!r}\n' + f'text: {text!r}\n' + ) now = time.time() period = now - last @@ -158,8 +167,12 @@ async def handle_viewmode_kb_inputs( # have no previous keys or we do and the min_tap period is # met if ( - not fast_key_seq or - period <= min_tap and fast_key_seq + not fast_key_seq + or ( + period <= min_tap + and + fast_key_seq + ) ): fast_key_seq.append(text) log.debug(f'fast keys seqs {fast_key_seq}') @@ -174,7 +187,8 @@ async def handle_viewmode_kb_inputs( # UI REPL-shell, with ctrl-p (for "pause") if ( ctrl - and key in { + and + key in { Qt.Key_P, } ): @@ -184,7 +198,6 @@ async def handle_viewmode_kb_inputs( vlm_chart = chart.linked.subplots['volume'] # noqa vlm_viz = vlm_chart.main_viz # noqa dvlm_pi = vlm_chart._vizs['dolla_vlm'].plot # noqa - import tractor await tractor.pause() view.interact_graphics_cycle() @@ -192,7 +205,8 @@ async def handle_viewmode_kb_inputs( # shown data `Viz`s for the current chart app. if ( ctrl - and key in { + and + key in { Qt.Key_R, } ): @@ -231,7 +245,8 @@ async def handle_viewmode_kb_inputs( key == Qt.Key_Escape or ( ctrl - and key == Qt.Key_C + and + key == Qt.Key_C ) ): # ctrl-c as cancel @@ -242,17 +257,35 @@ async def handle_viewmode_kb_inputs( # cancel order or clear graphics if ( key == Qt.Key_C - or key == Qt.Key_Delete + or + key == Qt.Key_Delete ): + # log.info('Handling hotkey!') + try: + dialogs: list[Dialog] = order_mode.cancel_orders_under_cursor() + except BaseException: + log.exception('Failed to cancel orders !?\n') + await tractor.pause() - order_mode.cancel_orders_under_cursor() + if not dialogs: + log.warning( + 'No orders were cancelled?\n' + 'Is there an order-line under the cursor?\n' + 'If you think there IS your DE might be "hiding the mouse" before ' + 'we rx the keyboard input via Qt..\n' + '=> Check your DE and/or TWM settings to be sure! <=\n' + ) + # ^TODO?, some way to detect if there's lines and + # the DE is cuckin with things? + # await tractor.pause() # View modes if ( ctrl and ( key == Qt.Key_Equal - or key == Qt.Key_I + or + key == Qt.Key_I ) ): view.wheelEvent( @@ -264,7 +297,8 @@ async def handle_viewmode_kb_inputs( ctrl and ( key == Qt.Key_Minus - or key == Qt.Key_O + or + key == Qt.Key_O ) ): view.wheelEvent( @@ -275,7 +309,8 @@ async def handle_viewmode_kb_inputs( elif ( not ctrl - and key == Qt.Key_R + and + key == Qt.Key_R ): # NOTE: seems that if we don't yield a Qt render # cycle then the m4 downsampled curves will show here @@ -477,7 +512,8 @@ async def handle_viewmode_mouse( # view.raiseContextMenu(event) if ( - view.order_mode.active and + view.order_mode.active + and button == QtCore.Qt.LeftButton ): # when in order mode, submit execution @@ -781,7 +817,8 @@ class ChartView(ViewBox): # Scale or translate based on mouse button if btn & ( - QtCore.Qt.LeftButton | QtCore.Qt.MidButton + QtCore.Qt.LeftButton + | QtCore.Qt.MidButton ): # zoom y-axis ONLY when click-n-drag on it # if axis == 1: diff --git a/piker/ui/_lines.py b/piker/ui/_lines.py index e1b6d3ed..f0b7b126 100644 --- a/piker/ui/_lines.py +++ b/piker/ui/_lines.py @@ -52,10 +52,13 @@ from ._anchors import ( from ..calc import humanize from ._label import Label from ._style import hcolor, _font +from ..log import get_logger if TYPE_CHECKING: from ._cursor import Cursor +log = get_logger(__name__) + # TODO: probably worth investigating if we can # make .boundingRect() faster: @@ -347,7 +350,7 @@ class LevelLine(pg.InfiniteLine): ) -> None: # TODO: enter labels edit mode - print(f'double click {ev}') + log.debug(f'double click {ev}') def paint( self, @@ -461,10 +464,19 @@ class LevelLine(pg.InfiniteLine): # hovered if ( not ev.isExit() - and ev.acceptDrags(QtCore.Qt.LeftButton) + and + ev.acceptDrags(QtCore.Qt.LeftButton) ): # if already hovered we don't need to run again - if self.mouseHovering is True: + if ( + self.mouseHovering is True + and + cur.is_hovered(self) + ): + log.debug( + f'Already hovering ??\n' + f'cur._hovered: {cur._hovered!r}\n' + ) return if self.only_show_markers_on_hover: @@ -481,6 +493,7 @@ class LevelLine(pg.InfiniteLine): cur._y_label_update = False # add us to cursor state + log.debug(f'Adding line {self!r}\n') cur.add_hovered(self) if self._hide_xhair_on_hover: @@ -508,6 +521,7 @@ class LevelLine(pg.InfiniteLine): self.currentPen = self.pen + log.debug(f'Removing line {self!r}\n') cur._hovered.remove(self) if self.only_show_markers_on_hover: diff --git a/piker/ui/_widget.py b/piker/ui/_widget.py index b6a7322e..6d5c2b13 100644 --- a/piker/ui/_widget.py +++ b/piker/ui/_widget.py @@ -300,7 +300,10 @@ class GodWidget(QWidget): getattr(widget, 'on_resize') self._widgets[widget.mode_name] = widget - def on_win_resize(self, event: QtCore.QEvent) -> None: + def on_win_resize( + self, + event: QtCore.QEvent, + ) -> None: ''' Top level god widget handler from window (the real yaweh) resize events such that any registered widgets which wish to be @@ -315,7 +318,10 @@ class GodWidget(QWidget): self._resizing = True - log.info('God widget resize') + log.debug( + f'God widget resize\n' + f'{event}\n' + ) for name, widget in self._widgets.items(): widget.on_resize() diff --git a/piker/ui/_window.py b/piker/ui/_window.py index 39335092..aa2b9ea0 100644 --- a/piker/ui/_window.py +++ b/piker/ui/_window.py @@ -255,8 +255,16 @@ class MainWindow(QMainWindow): current: QWidget, ) -> None: + ''' + Focus handler. - log.info(f'widget focus changed from {last} -> {current}') + For now updates the "current mode" name. + + ''' + log.debug( + f'widget focus changed from,\n' + f'{last} -> {current}' + ) if current is not None: # cursor left window? diff --git a/piker/ui/cli.py b/piker/ui/cli.py index dfc7c7ea..d2072900 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -177,7 +177,7 @@ def chart( return # global opts - brokernames = config['brokers'] + # brokernames: list[str] = config['brokers'] brokermods = config['brokermods'] assert brokermods tractorloglevel = config['tractorloglevel'] @@ -216,6 +216,7 @@ def chart( layers['tcp']['port'], )) + # breakpoint() from tractor.devx import maybe_open_crash_handler pdb: bool = config['pdb'] with maybe_open_crash_handler(pdb=pdb): diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 76bee0ef..0f655749 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -77,7 +77,6 @@ from ._style import _font from ._forms import open_form_input_handling from ._notify import notify_from_ems_status_msg - if TYPE_CHECKING: from ._chart import ( ChartPlotWidget, @@ -436,7 +435,7 @@ class OrderMode: lines=lines, last_status_close=self.multistatus.open_status( f'submitting {order.exec_mode}-{order.action}', - final_msg=f'submitted {order.exec_mode}-{order.action}', + # final_msg=f'submitted {order.exec_mode}-{order.action}', clear_on_next=True, ) ) @@ -514,13 +513,14 @@ class OrderMode: def on_submit( self, uuid: str, - order: Order | None = None, + order: Order|None = None, - ) -> Dialog | None: + ) -> Dialog|None: ''' Order submitted status event handler. - Commit the order line and registered order uuid, store ack time stamp. + Commit the order line and registered order uuid, store ack + time stamp. ''' lines = self.lines.commit_line(uuid) @@ -528,7 +528,7 @@ class OrderMode: # a submission is the start of a new order dialog dialog = self.dialogs[uuid] dialog.lines = lines - cls: Callable | None = dialog.last_status_close + cls: Callable|None = dialog.last_status_close if cls: cls() @@ -658,7 +658,7 @@ class OrderMode: return True - def cancel_orders_under_cursor(self) -> list[str]: + def cancel_orders_under_cursor(self) -> list[Dialog]: return self.cancel_orders( self.oids_from_lines( self.lines.lines_under_cursor() @@ -687,24 +687,28 @@ class OrderMode: self, oids: list[str], - ) -> None: + ) -> list[Dialog]: ''' Cancel all orders from a list of order ids: `oids`. ''' - key = self.multistatus.open_status( - f'cancelling {len(oids)} orders', - final_msg=f'cancelled orders:\n{oids}', - group_key=True - ) + # key = self.multistatus.open_status( + # f'cancelling {len(oids)} orders', + # final_msg=f'cancelled orders:\n{oids}', + # group_key=True + # ) + dialogs: list[Dialog] = [] for oid in oids: if dialog := self.dialogs.get(oid): self.client.cancel_nowait(uuid=oid) - cancel_status_close = self.multistatus.open_status( - f'cancelling order {oid}', - group_key=key, - ) - dialog.last_status_close = cancel_status_close + # cancel_status_close = self.multistatus.open_status( + # f'cancelling order {oid}', + # group_key=key, + # ) + # dialog.last_status_close = cancel_status_close + dialogs.append(dialog) + + return dialogs def cancel_all_orders(self) -> None: ''' @@ -776,7 +780,6 @@ class OrderMode: @asynccontextmanager async def open_order_mode( - feed: Feed, godw: GodWidget, fqme: str, diff --git a/uv.lock b/uv.lock index 9de1420d..6807bbdc 100644 --- a/uv.lock +++ b/uv.lock @@ -1159,11 +1159,11 @@ uis = [ [[package]] name = "platformdirs" -version = "4.5.1" +version = "4.6.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/cf/86/0248f086a84f01b37aaec0fa567b397df1a119f73c16f6c7a9aac73ea309/platformdirs-4.5.1.tar.gz", hash = "sha256:61d5cdcc6065745cdd94f0f878977f8de9437be93de97c1c12f853c9c0cdcbda", size = 21715, upload-time = "2025-12-05T13:52:58.638Z" } +sdist = { url = "https://files.pythonhosted.org/packages/20/e5/474d0a8508029286b905622e6929470fb84337cfa08f9d09fbb624515249/platformdirs-4.6.0.tar.gz", hash = "sha256:4a13c2db1071e5846c3b3e04e5b095c0de36b2a24be9a3bc0145ca66fce4e328", size = 23433, upload-time = "2026-02-12T14:36:21.288Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/cb/28/3bfe2fa5a7b9c46fe7e13c97bda14c895fb10fa2ebf1d0abb90e0cea7ee1/platformdirs-4.5.1-py3-none-any.whl", hash = "sha256:d03afa3963c806a9bed9d5125c8f4cb2fdaf74a55ab60e5d59b3fde758104d31", size = 18731, upload-time = "2025-12-05T13:52:56.823Z" }, + { url = "https://files.pythonhosted.org/packages/da/10/1b0dcf51427326f70e50d98df21b18c228117a743a1fc515a42f8dc7d342/platformdirs-4.6.0-py3-none-any.whl", hash = "sha256:dd7f808d828e1764a22ebff09e60f175ee3c41876606a6132a688d809c7c9c73", size = 19549, upload-time = "2026-02-12T14:36:19.743Z" }, ] [[package]]