From 9592735aaae2780e8ea5c166532994e1b32af5bb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Oct 2024 16:04:34 -0400 Subject: [PATCH 1/2] .clearing._ems: Don't require `first_quote['last']` Instead just check for the field (which i'm not huge on the key-name for anyway) and if not found get the "last price" from the real-time shm buffer's latest 'close' sample. Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop such that if a `client_stream` is popped on connection failure, we don't RTE for the "size changed on iteration". --- piker/clearing/_ems.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3f7045fa..af5fe690 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -653,7 +653,11 @@ class Router(Struct): flume = feed.flumes[fqme] first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) - book.lasts[fqme]: float = float(first_quote['last']) + + if not (last := first_quote.get('last')): + last: float = flume.rt_shm.array[-1]['close'] + + book.lasts[fqme]: float = float(last) async with self.maybe_open_brokerd_dialog( brokermod=brokermod, @@ -716,7 +720,7 @@ class Router(Struct): subs = self.subscribers[sub_key] sent_some: bool = False - for client_stream in subs: + for client_stream in subs.copy(): try: await client_stream.send(msg) sent_some = True @@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( - status_msg.req.symbol, - status_msg, - ) + if not status_msg.req: + # likely some order change state? + await tractor.pause() + else: + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') From 60390ae59673376bdb98a57487b03c61db1098a7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 10:46:44 -0500 Subject: [PATCH 2/2] Various `.clearing` todos/notes on potential issues with loglevel settings.. --- piker/clearing/_client.py | 6 +++++- piker/clearing/_paper_engine.py | 2 ++ piker/clearing/_util.py | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 9bb2aa74..6d8f645e 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -168,7 +168,6 @@ class OrderClient(Struct): async def relay_orders_from_sync_code( - client: OrderClient, symbol_key: str, to_ems_stream: tractor.MsgStream, @@ -242,6 +241,11 @@ async def open_ems( async with maybe_open_emsd( broker, + # XXX NOTE, LOL so this determines the daemon `emsd` loglevel + # then FYI.. that's kinda wrong no? + # -[ ] shouldn't it be set by `pikerd -l` or no? + # -[ ] would make a lot more sense to have a subsys ctl for + # levels.. like `-l emsd.info` or something? loglevel=loglevel, ) as portal: diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 0393b2e6..e303d76c 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -297,6 +297,8 @@ class PaperBoi(Struct): # transmit pp msg to ems pp: Position = self.acnt.pps[bs_mktid] + # TODO, this will break if `require_only=True` was passed to + # `.update_from_ledger()` pp_msg = BrokerdPosition( broker=self.broker, diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index 962861e8..c82a01aa 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -30,6 +30,7 @@ subsys: str = 'piker.clearing' log = get_logger(subsys) +# TODO, oof doesn't this ignore the `loglevel` then??? get_console_log = partial( get_console_log, name=subsys,