From 46c51b55f7598f69b47c01f025008d92a09fe550 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 3 Aug 2022 20:13:37 -0400 Subject: [PATCH] Flatten the client-request handler loop with `match:` --- piker/clearing/_ems.py | 85 +++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5f08cfa5..3853ef66 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -822,34 +822,25 @@ async def process_client_order_cmds( # cmd: dict async for cmd in client_order_stream: - log.info(f'Received order cmd:\n{pformat(cmd)}') - action = cmd['action'] oid = cmd['oid'] - - # TODO: make ``tractor.MsgStream`` a frozen type again such that it - # can be stored in sets like the old context was. - # wait, maybe this **is** already working thanks to our parent - # `trio` type? - # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). client_dialogues[oid] = client_order_stream - reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) - # TODO: can't wait for this stuff to land in 3.10 - # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings - if action in ('cancel',): + match cmd: - # check for live-broker order - if live_entry: + # existing live-broker order cancel + case { + 'action': 'cancel', + 'oid': oid, + } if live_entry: reqid = live_entry.reqid - msg = BrokerdCancel( oid=oid, reqid=reqid, @@ -860,12 +851,10 @@ async def process_client_order_cmds( # NOTE: cancel response will be relayed back in messages # from corresponding broker if reqid is not None: - # send cancel to brokerd immediately! log.info( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(msg) else: @@ -876,7 +865,10 @@ async def process_client_order_cmds( dark_book._ems_entries[oid] = msg # dark trigger cancel - else: + case { + 'action': 'cancel', + 'oid': oid, + } if not live_entry: try: # remove from dark book clearing dark_book.orders[symbol].pop(oid, None) @@ -896,25 +888,27 @@ async def process_client_order_cmds( except KeyError: log.exception(f'No dark order for {symbol}?') - # TODO: 3.10 struct-pattern matching and unpacking here - elif action in ('alert', 'buy', 'sell',): + # live order submission + case { + 'oid': oid, + 'symbol': fqsn, + 'price': trigger_price, + 'size': size, + 'action': ('buy' | 'sell') as action, + 'exec_mode': 'live', + }: + # TODO: eventually we should be receiving + # this struct on the wire unpacked in a scoped protocol + # setup with ``tractor``. + msg = Order(**cmd) + broker = msg.brokers[0] - msg = Order(**cmd) - - fqsn = msg.symbol - trigger_price = msg.price - size = msg.size - exec_mode = msg.exec_mode - broker = msg.brokers[0] - # remove the broker part before creating a message - # to send to the specific broker since they probably - # aren't expectig their own name, but should they? - sym = fqsn.replace(f'.{broker}', '') - - if exec_mode == 'live' and action in ('buy', 'sell',): + # remove the broker part before creating a message + # to send to the specific broker since they probably + # aren't expectig their own name, but should they? + sym = fqsn.replace(f'.{broker}', '') if live_entry is not None: - # sanity check on emsd id assert live_entry.oid == oid reqid = live_entry.reqid @@ -954,12 +948,21 @@ async def process_client_order_cmds( # that live order asap. dark_book._ems_entries[oid] = msg - # "DARK" triggers - # submit order to local EMS book and scan loop, - # effectively a local clearing engine, which - # scans for conditions and triggers matching executions - elif exec_mode in ('dark', 'paper') or ( - action in ('alert') + # dark-order / alert submission + case { + 'oid': oid, + 'symbol': fqsn, + 'price': trigger_price, + 'size': size, + 'exec_mode': exec_mode, + 'action': action, + } if ( + # "DARK" triggers + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions + exec_mode in ('dark', 'paper') + or action == 'alert' ): # Auto-gen scanner predicate: # we automatically figure out what the alert check @@ -999,10 +1002,8 @@ async def process_client_order_cmds( abs_diff_away = 0 # submit execution/order to EMS scan loop - # NOTE: this may result in an override of an existing # dark book entry if the order id already exists - dark_book.orders.setdefault( fqsn, {} )[oid] = (