From 27df649fbfda30bcd6b331400dcfe395a8476000 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Oct 2024 16:04:34 -0400 Subject: [PATCH] .clearing._ems: Don't require `first_quote['last']` Instead just check for the field (which i'm not huge on the key-name for anyway) and if not found get the "last price" from the real-time shm buffer's latest 'close' sample. Unrelatedly, use a `subs.copy()` in the `Router.client_broadcast()` loop such that if a `client_stream` is popped on connection failure, we don't RTE for the "size changed on iteration". --- piker/clearing/_ems.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3f7045fa..af5fe690 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -653,7 +653,11 @@ class Router(Struct): flume = feed.flumes[fqme] first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) - book.lasts[fqme]: float = float(first_quote['last']) + + if not (last := first_quote.get('last')): + last: float = flume.rt_shm.array[-1]['close'] + + book.lasts[fqme]: float = float(last) async with self.maybe_open_brokerd_dialog( brokermod=brokermod, @@ -716,7 +720,7 @@ class Router(Struct): subs = self.subscribers[sub_key] sent_some: bool = False - for client_stream in subs: + for client_stream in subs.copy(): try: await client_stream.send(msg) sent_some = True @@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( - status_msg.req.symbol, - status_msg, - ) + if not status_msg.req: + # likely some order change state? + await tractor.pause() + else: + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!')