Flatten the client-request handler loop with `match:`

the_ems_flattening
Tyler Goodlet 2022-08-03 20:13:37 -04:00
parent a9185e7d6f
commit 46c51b55f7
1 changed files with 43 additions and 42 deletions

View File

@ -822,34 +822,25 @@ async def process_client_order_cmds(
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(f'Received order cmd:\n{pformat(cmd)}')
action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id # register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid) live_entry = dark_book._ems_entries.get(oid)
# TODO: can't wait for this stuff to land in 3.10 match cmd:
# https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings
if action in ('cancel',):
# check for live-broker order # existing live-broker order cancel
if live_entry: case {
'action': 'cancel',
'oid': oid,
} if live_entry:
reqid = live_entry.reqid reqid = live_entry.reqid
msg = BrokerdCancel( msg = BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
@ -860,12 +851,10 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
if reqid is not None: if reqid is not None:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)
else: else:
@ -876,7 +865,10 @@ async def process_client_order_cmds(
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# dark trigger cancel # dark trigger cancel
else: case {
'action': 'cancel',
'oid': oid,
} if not live_entry:
try: try:
# remove from dark book clearing # remove from dark book clearing
dark_book.orders[symbol].pop(oid, None) dark_book.orders[symbol].pop(oid, None)
@ -896,25 +888,27 @@ async def process_client_order_cmds(
except KeyError: except KeyError:
log.exception(f'No dark order for {symbol}?') log.exception(f'No dark order for {symbol}?')
# TODO: 3.10 struct-pattern matching and unpacking here # live order submission
elif action in ('alert', 'buy', 'sell',): case {
'oid': oid,
'symbol': fqsn,
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': 'live',
}:
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
msg = Order(**cmd)
broker = msg.brokers[0]
msg = Order(**cmd) # remove the broker part before creating a message
# to send to the specific broker since they probably
fqsn = msg.symbol # aren't expectig their own name, but should they?
trigger_price = msg.price sym = fqsn.replace(f'.{broker}', '')
size = msg.size
exec_mode = msg.exec_mode
broker = msg.brokers[0]
# remove the broker part before creating a message
# to send to the specific broker since they probably
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if exec_mode == 'live' and action in ('buy', 'sell',):
if live_entry is not None: if live_entry is not None:
# sanity check on emsd id # sanity check on emsd id
assert live_entry.oid == oid assert live_entry.oid == oid
reqid = live_entry.reqid reqid = live_entry.reqid
@ -954,12 +948,21 @@ async def process_client_order_cmds(
# that live order asap. # that live order asap.
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# "DARK" triggers # dark-order / alert submission
# submit order to local EMS book and scan loop, case {
# effectively a local clearing engine, which 'oid': oid,
# scans for conditions and triggers matching executions 'symbol': fqsn,
elif exec_mode in ('dark', 'paper') or ( 'price': trigger_price,
action in ('alert') 'size': size,
'exec_mode': exec_mode,
'action': action,
} if (
# "DARK" triggers
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper')
or action == 'alert'
): ):
# Auto-gen scanner predicate: # Auto-gen scanner predicate:
# we automatically figure out what the alert check # we automatically figure out what the alert check
@ -999,10 +1002,8 @@ async def process_client_order_cmds(
abs_diff_away = 0 abs_diff_away = 0
# submit execution/order to EMS scan loop # submit execution/order to EMS scan loop
# NOTE: this may result in an override of an existing # NOTE: this may result in an override of an existing
# dark book entry if the order id already exists # dark book entry if the order id already exists
dark_book.orders.setdefault( dark_book.orders.setdefault(
fqsn, {} fqsn, {}
)[oid] = ( )[oid] = (