From 76735189de38b319b2deb2d3209e54bf38759746 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 9 Nov 2024 15:14:34 -0500 Subject: [PATCH 1/6] data._web_bs: try to raise jsonrpc errors in parent task --- piker/data/_web_bs.py | 67 ++++++++++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 17 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 256b35af..a401588d 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -273,7 +273,7 @@ async def _reconnect_forever( nobsws._connected.set() await trio.sleep_forever() except HandshakeError: - log.exception(f'Retrying connection') + log.exception('Retrying connection') # ws & nursery block ends @@ -359,8 +359,8 @@ async def open_autorecon_ws( ''' -JSONRPC response-request style machinery for transparent multiplexing of msgs -over a NoBsWs. +JSONRPC response-request style machinery for transparent multiplexing +of msgs over a NoBsWs. ''' @@ -377,16 +377,20 @@ async def open_jsonrpc_session( url: str, start_id: int = 0, response_type: type = JSONRPCResult, - request_type: Optional[type] = None, - request_hook: Optional[Callable] = None, - error_hook: Optional[Callable] = None, + # request_type: Optional[type] = None, + # request_hook: Optional[Callable] = None, + # error_hook: Optional[Callable] = None, ) -> Callable[[str, dict], dict]: + # NOTE, store all request msgs so we can raise errors on the + # caller side! + req_msgs: dict[int, dict] = {} + async with ( trio.open_nursery() as n, open_autorecon_ws(url) as ws ): - rpc_id: Iterable = count(start_id) + rpc_id: Iterable[int] = count(start_id) rpc_results: dict[int, dict] = {} async def json_rpc(method: str, params: dict) -> dict: @@ -394,26 +398,40 @@ async def open_jsonrpc_session( perform a json rpc call and wait for the result, raise exception in case of error field present on response ''' + nonlocal req_msgs + + req_id: int = next(rpc_id) msg = { 'jsonrpc': '2.0', - 'id': next(rpc_id), + 'id': req_id, 'method': method, 'params': params } _id = msg['id'] - rpc_results[_id] = { + result = rpc_results[_id] = { 'result': None, - 'event': trio.Event() + 'error': None, + 'event': trio.Event(), # signal caller resp arrived } + req_msgs[_id] = msg await ws.send_msg(msg) + # wait for reponse before unblocking requester code await rpc_results[_id]['event'].wait() - ret = rpc_results[_id]['result'] + if (maybe_result := result['result']): + ret = maybe_result + del rpc_results[_id] - del rpc_results[_id] + else: + err = result['error'] + raise Exception( + f'JSONRPC request failed\n' + f'req: {msg}\n' + f'resp: {err}\n' + ) if ret.error is not None: raise Exception(json.dumps(ret.error, indent=4)) @@ -428,6 +446,7 @@ async def open_jsonrpc_session( the server side. ''' + nonlocal req_msgs async for msg in ws: match msg: case { @@ -451,15 +470,29 @@ async def open_jsonrpc_session( 'params': _, }: log.debug(f'Recieved\n{msg}') - if request_hook: - await request_hook(request_type(**msg)) + # if request_hook: + # await request_hook(request_type(**msg)) case { 'error': error }: - log.warning(f'Recieved\n{error}') - if error_hook: - await error_hook(response_type(**msg)) + # if error_hook: + # await error_hook(response_type(**msg)) + + # retreive orig request msg, set error + # response in original "result" msg, + # THEN FINALLY set the event to signal caller + # to raise the error in the parent task. + req_id: int = error['id'] + req_msg: dict = req_msgs[req_id] + result: dict = rpc_results[req_id] + result['error'] = error + result['event'].set() + log.error( + f'JSONRPC request failed\n' + f'req: {req_msg}\n' + f'resp: {error}\n' + ) case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') -- 2.34.1 From 5633f5614d8c292c034c20545e20f5b488d65c7e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Feb 2025 11:07:27 -0500 Subject: [PATCH 2/6] Doc-n-clean `.data._web_bs.open_jsonrpc_session()` Add a doc-string reflecting recent refinements, drop all the old hook params, rename `n: trio.Nursery` -> `tn` for "task nursery" fitting with code base's naming style. --- piker/data/_web_bs.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index a401588d..e9d2deeb 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -377,17 +377,22 @@ async def open_jsonrpc_session( url: str, start_id: int = 0, response_type: type = JSONRPCResult, - # request_type: Optional[type] = None, - # request_hook: Optional[Callable] = None, - # error_hook: Optional[Callable] = None, ) -> Callable[[str, dict], dict]: + ''' + Init a json-RPC-over-websocket connection to the provided `url`. + A `json_rpc: Callable[[str, dict], dict` is delivered to the + caller for sending requests and a bg-`trio.Task` handles + processing of response msgs including error reporting/raising in + the parent/caller task. + + ''' # NOTE, store all request msgs so we can raise errors on the # caller side! req_msgs: dict[int, dict] = {} async with ( - trio.open_nursery() as n, + trio.open_nursery() as tn, open_autorecon_ws(url) as ws ): rpc_id: Iterable[int] = count(start_id) @@ -470,15 +475,10 @@ async def open_jsonrpc_session( 'params': _, }: log.debug(f'Recieved\n{msg}') - # if request_hook: - # await request_hook(request_type(**msg)) case { 'error': error }: - # if error_hook: - # await error_hook(response_type(**msg)) - # retreive orig request msg, set error # response in original "result" msg, # THEN FINALLY set the event to signal caller @@ -497,6 +497,6 @@ async def open_jsonrpc_session( case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') - n.start_soon(recv_task) + tn.start_soon(recv_task) yield json_rpc - n.cancel_scope.cancel() + tn.cancel_scope.cancel() -- 2.34.1 From e391c896f847dda05f45f95dfdf981273a7ebacf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 16:58:40 -0500 Subject: [PATCH 3/6] Mk jsronrpc's underlying ws timeout `float('inf')` Since currently we're only using this IPC subsys for `deribit`, and generally speaking we're primarly supporting options markets (which are fairly "slow moving"), flip to a default of NOT resetting the `NoBsWs` on timeout since doing so normally breaks the jsron-rpc IPC session. Without a proper `fixture` passed to `open_autorecon_ws()` (which we should eventually implement!!) relying on a timeout-to-reset more or less will just cause breakage issues - a proper reconnect sequence must be implemented before using that feature. Deats, - expose and proxy through the `msg_recv_timeout` from `open_jsonrpc_session()` into the underlying `open_autorecon_ws()` call. --- piker/data/_web_bs.py | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index e9d2deeb..4d886fbc 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -360,7 +360,7 @@ async def open_autorecon_ws( ''' JSONRPC response-request style machinery for transparent multiplexing -of msgs over a NoBsWs. +of msgs over a `NoBsWs`. ''' @@ -377,6 +377,16 @@ async def open_jsonrpc_session( url: str, start_id: int = 0, response_type: type = JSONRPCResult, + msg_recv_timeout: float = float('inf'), + # ^NOTE, since only `deribit` is using this jsonrpc stuff atm + # and options mkts are generally "slow moving".. + # + # FURTHER if we break the underlying ws connection then since we + # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. + # `_reconnect_forever()`, the jsonrpc "transport pipe" get's + # broken and never restored with wtv init sequence is required to + # re-establish a working req-resp session. + ) -> Callable[[str, dict], dict]: ''' Init a json-RPC-over-websocket connection to the provided `url`. @@ -393,12 +403,18 @@ async def open_jsonrpc_session( async with ( trio.open_nursery() as tn, - open_autorecon_ws(url) as ws + open_autorecon_ws( + url=url, + msg_recv_timeout=msg_recv_timeout, + ) as ws ): rpc_id: Iterable[int] = count(start_id) rpc_results: dict[int, dict] = {} - async def json_rpc(method: str, params: dict) -> dict: + async def json_rpc( + method: str, + params: dict, + ) -> dict: ''' perform a json rpc call and wait for the result, raise exception in case of error field present on response -- 2.34.1 From 4d15b9bfddee16be716e3e7562626a6eec8043d9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Oct 2024 12:49:20 -0400 Subject: [PATCH 4/6] Allow ledger passes to ignore (symcache) unknown fqmes For example in the paper-eng, if you have a backend that doesn't fully support a symcache (yet) it's handy to be able to ignore processing other paper-eng txns when all you care about at the moment is the simulated symbol. NOTE, that currently this will still result in a key-error when you load more then one mkt with the paper engine (for which the backend does not have the symcache implemented) since no fqme ad-hoc query was made for the 2nd symbol (and i'm not sure we should support that kinda hackery over just encouraging the sym-cache being added?). Def needs a little more thought depending on how many backends are never going to be able to (easily) support caching.. --- piker/accounting/_pos.py | 45 +++++++++++++++++++++++++++++---- piker/clearing/_paper_engine.py | 1 + 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 1b305009..5952418f 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -30,7 +30,8 @@ from types import ModuleType from typing import ( Any, Iterator, - Generator + Generator, + TYPE_CHECKING, ) import pendulum @@ -59,8 +60,10 @@ from ..clearing._messages import ( BrokerdPosition, ) from piker.types import Struct -from piker.data._symcache import SymbologyCache -from ..log import get_logger +from piker.log import get_logger + +if TYPE_CHECKING: + from piker.data._symcache import SymbologyCache log = get_logger(__name__) @@ -493,6 +496,17 @@ class Account(Struct): _mktmap_table: dict[str, MktPair] | None = None, + only_require: list[str]|True = True, + # ^list of fqmes that are "required" to be processed from + # this ledger pass; we often don't care about others and + # definitely shouldn't always error in such cases. + # (eg. broker backend loaded that doesn't yet supsport the + # symcache but also, inside the paper engine we don't ad-hoc + # request `get_mkt_info()` for every symbol in the ledger, + # only the one for which we're simulating against). + # TODO, not sure if there's a better soln for this, ideally + # all backends get symcache support afap i guess.. + ) -> dict[str, Position]: ''' Update the internal `.pps[str, Position]` table from input @@ -535,11 +549,32 @@ class Account(Struct): if _mktmap_table is None: raise + required: bool = ( + only_require is True + or ( + only_require is not True + and + fqme in only_require + ) + ) # XXX: caller is allowed to provide a fallback # mktmap table for the case where a new position is # being added and the preloaded symcache didn't # have this entry prior (eg. with frickin IB..) - mkt = _mktmap_table[fqme] + if ( + not (mkt := _mktmap_table.get(fqme)) + and + required + ): + raise + + elif not required: + continue + + else: + # should be an entry retreived somewhere + assert mkt + if not (pos := pps.get(bs_mktid)): @@ -656,7 +691,7 @@ class Account(Struct): def write_config(self) -> None: ''' Write the current account state to the user's account TOML file, normally - something like ``pps.toml``. + something like `pps.toml`. ''' # TODO: show diff output? diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 0393b2e6..fea9ff8e 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -653,6 +653,7 @@ async def open_trade_dialog( # in) use manually constructed table from calling # the `.get_mkt_info()` provider EP above. _mktmap_table=mkt_by_fqme, + only_require=list(mkt_by_fqme), ) pp_msgs: list[BrokerdPosition] = [] -- 2.34.1 From 8dcdf7c9a9cf47d166e83aebb42d31ae5bbc6811 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 10:59:25 -0500 Subject: [PATCH 5/6] Invert `getattr()` check for `get_mkt_pairs()` ep Such that we `return` early when not defined by the provider backend to reduce an indent level in `SymbologyCache.load()`. --- piker/data/_symcache.py | 108 ++++++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 48 deletions(-) diff --git a/piker/data/_symcache.py b/piker/data/_symcache.py index fc1057a0..bcaa5844 100644 --- a/piker/data/_symcache.py +++ b/piker/data/_symcache.py @@ -31,6 +31,7 @@ from pathlib import Path from pprint import pformat from typing import ( Any, + Callable, Sequence, Hashable, TYPE_CHECKING, @@ -56,7 +57,7 @@ from piker.brokers import ( ) if TYPE_CHECKING: - from ..accounting import ( + from piker.accounting import ( Asset, MktPair, ) @@ -149,57 +150,68 @@ class SymbologyCache(Struct): 'Implement `Client.get_assets()`!' ) - if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None): - - pairs: dict[str, Struct] = await get_mkt_pairs() - for bs_fqme, pair in pairs.items(): - - # NOTE: every backend defined pair should - # declare it's ns path for roundtrip - # serialization lookup. - if not getattr(pair, 'ns_path', None): - raise TypeError( - f'Pair-struct for {self.mod.name} MUST define a ' - '`.ns_path: str`!\n' - f'{pair}' - ) - - entry = await self.mod.get_mkt_info(pair.bs_fqme) - if not entry: - continue - - mkt: MktPair - pair: Struct - mkt, _pair = entry - assert _pair is pair, ( - f'`{self.mod.name}` backend probably has a ' - 'keying-symmetry problem between the pair-`Struct` ' - 'returned from `Client.get_mkt_pairs()`and the ' - 'module level endpoint: `.get_mkt_info()`\n\n' - "Here's the struct diff:\n" - f'{_pair - pair}' - ) - # NOTE XXX: this means backends MUST implement - # a `Struct.bs_mktid: str` field to provide - # a native-keyed map to their own symbol - # set(s). - self.pairs[pair.bs_mktid] = pair - - # NOTE: `MktPair`s are keyed here using piker's - # internal FQME schema so that search, - # accounting and feed init can be accomplished - # a sane, uniform, normalized basis. - self.mktmaps[mkt.fqme] = mkt - - self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref( - pair, - ) - - else: + get_mkt_pairs: Callable|None = getattr( + client, + 'get_mkt_pairs', + None, + ) + if not get_mkt_pairs: log.warning( 'No symbology cache `Pair` support for `{provider}`..\n' 'Implement `Client.get_mkt_pairs()`!' ) + return self + + pairs: dict[str, Struct] = await get_mkt_pairs() + if not pairs: + log.warning( + 'No pairs from intial {provider!r} sym-cache request?\n\n' + '`Client.get_mkt_pairs()` -> {pairs!r} ?' + ) + return self + + for bs_fqme, pair in pairs.items(): + if not getattr(pair, 'ns_path', None): + # XXX: every backend defined pair must declare + # a `.ns_path: tractor.NamespacePath` to enable + # roundtrip serialization lookup from a local + # cache file. + raise TypeError( + f'Pair-struct for {self.mod.name} MUST define a ' + '`.ns_path: str`!\n\n' + f'{pair!r}' + ) + + entry = await self.mod.get_mkt_info(pair.bs_fqme) + if not entry: + continue + + mkt: MktPair + pair: Struct + mkt, _pair = entry + assert _pair is pair, ( + f'`{self.mod.name}` backend probably has a ' + 'keying-symmetry problem between the pair-`Struct` ' + 'returned from `Client.get_mkt_pairs()`and the ' + 'module level endpoint: `.get_mkt_info()`\n\n' + "Here's the struct diff:\n" + f'{_pair - pair}' + ) + # NOTE XXX: this means backends MUST implement + # a `Struct.bs_mktid: str` field to provide + # a native-keyed map to their own symbol + # set(s). + self.pairs[pair.bs_mktid] = pair + + # NOTE: `MktPair`s are keyed here using piker's + # internal FQME schema so that search, + # accounting and feed init can be accomplished + # a sane, uniform, normalized basis. + self.mktmaps[mkt.fqme] = mkt + + self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref( + pair, + ) return self -- 2.34.1 From c3f3b25524765d1a4bf8f119c9a86e0bed04dc85 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 11:04:59 -0500 Subject: [PATCH 6/6] Teensie `piker.data` styling tweaks - use more compact optional value style with `|`-union - fix `.flows` typing-only import since we need `MktPair` to be immediately defined for use on a `msgspec.Struct` field. - more "tree-like" warning msg in `.validate()` reporting. --- piker/data/feed.py | 6 ++---- piker/data/flows.py | 6 +++--- piker/data/validate.py | 6 +++--- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 7264c8e6..d47d8df9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -786,7 +786,6 @@ async def install_brokerd_search( @acm async def maybe_open_feed( - fqmes: list[str], loglevel: str | None = None, @@ -840,13 +839,12 @@ async def maybe_open_feed( @acm async def open_feed( - fqmes: list[str], - loglevel: str | None = None, + loglevel: str|None = None, allow_overruns: bool = True, start_stream: bool = True, - tick_throttle: float | None = None, # Hz + tick_throttle: float|None = None, # Hz allow_remote_ctl_ui: bool = False, diff --git a/piker/data/flows.py b/piker/data/flows.py index 677b2f69..573180b9 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -36,10 +36,10 @@ from ._sharedmem import ( ShmArray, _Token, ) +from piker.accounting import MktPair if TYPE_CHECKING: - from ..accounting import MktPair - from .feed import Feed + from piker.data.feed import Feed class Flume(Struct): @@ -82,7 +82,7 @@ class Flume(Struct): # TODO: do we need this really if we can pull the `Portal` from # ``tractor``'s internals? - feed: Feed | None = None + feed: Feed|None = None @property def rt_shm(self) -> ShmArray: diff --git a/piker/data/validate.py b/piker/data/validate.py index cefa0f1f..c5317ede 100644 --- a/piker/data/validate.py +++ b/piker/data/validate.py @@ -113,9 +113,9 @@ def validate_backend( ) if ep is None: log.warning( - f'Provider backend {mod.name} is missing ' - f'{daemon_name} support :(\n' - f'The following endpoint is missing: {name}' + f'Provider backend {mod.name!r} is missing ' + f'{daemon_name!r} support?\n' + f'|_module endpoint-func missing: {name!r}\n' ) inits: list[ -- 2.34.1