diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 9bb2aa74..6d8f645e 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -168,7 +168,6 @@ class OrderClient(Struct): async def relay_orders_from_sync_code( - client: OrderClient, symbol_key: str, to_ems_stream: tractor.MsgStream, @@ -242,6 +241,11 @@ async def open_ems( async with maybe_open_emsd( broker, + # XXX NOTE, LOL so this determines the daemon `emsd` loglevel + # then FYI.. that's kinda wrong no? + # -[ ] shouldn't it be set by `pikerd -l` or no? + # -[ ] would make a lot more sense to have a subsys ctl for + # levels.. like `-l emsd.info` or something? loglevel=loglevel, ) as portal: diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3f7045fa..af5fe690 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -653,7 +653,11 @@ class Router(Struct): flume = feed.flumes[fqme] first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) - book.lasts[fqme]: float = float(first_quote['last']) + + if not (last := first_quote.get('last')): + last: float = flume.rt_shm.array[-1]['close'] + + book.lasts[fqme]: float = float(last) async with self.maybe_open_brokerd_dialog( brokermod=brokermod, @@ -716,7 +720,7 @@ class Router(Struct): subs = self.subscribers[sub_key] sent_some: bool = False - for client_stream in subs: + for client_stream in subs.copy(): try: await client_stream.send(msg) sent_some = True @@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( - status_msg.req.symbol, - status_msg, - ) + if not status_msg.req: + # likely some order change state? + await tractor.pause() + else: + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 0393b2e6..e303d76c 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -297,6 +297,8 @@ class PaperBoi(Struct): # transmit pp msg to ems pp: Position = self.acnt.pps[bs_mktid] + # TODO, this will break if `require_only=True` was passed to + # `.update_from_ledger()` pp_msg = BrokerdPosition( broker=self.broker, diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index 962861e8..c82a01aa 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -30,6 +30,7 @@ subsys: str = 'piker.clearing' log = get_logger(subsys) +# TODO, oof doesn't this ignore the `loglevel` then??? get_console_log = partial( get_console_log, name=subsys, diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 256b35af..4d886fbc 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -273,7 +273,7 @@ async def _reconnect_forever( nobsws._connected.set() await trio.sleep_forever() except HandshakeError: - log.exception(f'Retrying connection') + log.exception('Retrying connection') # ws & nursery block ends @@ -359,8 +359,8 @@ async def open_autorecon_ws( ''' -JSONRPC response-request style machinery for transparent multiplexing of msgs -over a NoBsWs. +JSONRPC response-request style machinery for transparent multiplexing +of msgs over a `NoBsWs`. ''' @@ -377,43 +377,82 @@ async def open_jsonrpc_session( url: str, start_id: int = 0, response_type: type = JSONRPCResult, - request_type: Optional[type] = None, - request_hook: Optional[Callable] = None, - error_hook: Optional[Callable] = None, + msg_recv_timeout: float = float('inf'), + # ^NOTE, since only `deribit` is using this jsonrpc stuff atm + # and options mkts are generally "slow moving".. + # + # FURTHER if we break the underlying ws connection then since we + # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. + # `_reconnect_forever()`, the jsonrpc "transport pipe" get's + # broken and never restored with wtv init sequence is required to + # re-establish a working req-resp session. + ) -> Callable[[str, dict], dict]: + ''' + Init a json-RPC-over-websocket connection to the provided `url`. + + A `json_rpc: Callable[[str, dict], dict` is delivered to the + caller for sending requests and a bg-`trio.Task` handles + processing of response msgs including error reporting/raising in + the parent/caller task. + + ''' + # NOTE, store all request msgs so we can raise errors on the + # caller side! + req_msgs: dict[int, dict] = {} async with ( - trio.open_nursery() as n, - open_autorecon_ws(url) as ws + trio.open_nursery() as tn, + open_autorecon_ws( + url=url, + msg_recv_timeout=msg_recv_timeout, + ) as ws ): - rpc_id: Iterable = count(start_id) + rpc_id: Iterable[int] = count(start_id) rpc_results: dict[int, dict] = {} - async def json_rpc(method: str, params: dict) -> dict: + async def json_rpc( + method: str, + params: dict, + ) -> dict: ''' perform a json rpc call and wait for the result, raise exception in case of error field present on response ''' + nonlocal req_msgs + + req_id: int = next(rpc_id) msg = { 'jsonrpc': '2.0', - 'id': next(rpc_id), + 'id': req_id, 'method': method, 'params': params } _id = msg['id'] - rpc_results[_id] = { + result = rpc_results[_id] = { 'result': None, - 'event': trio.Event() + 'error': None, + 'event': trio.Event(), # signal caller resp arrived } + req_msgs[_id] = msg await ws.send_msg(msg) + # wait for reponse before unblocking requester code await rpc_results[_id]['event'].wait() - ret = rpc_results[_id]['result'] + if (maybe_result := result['result']): + ret = maybe_result + del rpc_results[_id] - del rpc_results[_id] + else: + err = result['error'] + raise Exception( + f'JSONRPC request failed\n' + f'req: {msg}\n' + f'resp: {err}\n' + ) if ret.error is not None: raise Exception(json.dumps(ret.error, indent=4)) @@ -428,6 +467,7 @@ async def open_jsonrpc_session( the server side. ''' + nonlocal req_msgs async for msg in ws: match msg: case { @@ -451,19 +491,28 @@ async def open_jsonrpc_session( 'params': _, }: log.debug(f'Recieved\n{msg}') - if request_hook: - await request_hook(request_type(**msg)) case { 'error': error }: - log.warning(f'Recieved\n{error}') - if error_hook: - await error_hook(response_type(**msg)) + # retreive orig request msg, set error + # response in original "result" msg, + # THEN FINALLY set the event to signal caller + # to raise the error in the parent task. + req_id: int = error['id'] + req_msg: dict = req_msgs[req_id] + result: dict = rpc_results[req_id] + result['error'] = error + result['event'].set() + log.error( + f'JSONRPC request failed\n' + f'req: {req_msg}\n' + f'resp: {error}\n' + ) case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') - n.start_soon(recv_task) + tn.start_soon(recv_task) yield json_rpc - n.cancel_scope.cancel() + tn.cancel_scope.cancel()