Merge pull request #372 from pikers/the_ems_flattening

The ems flattening
msgpack_zombie
goodboy 2022-08-05 21:03:59 -04:00 committed by GitHub
commit 9651ca84bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 297 additions and 288 deletions

View File

@ -23,7 +23,11 @@ from dataclasses import dataclass, field
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
import time import time
from typing import AsyncIterator, Callable from typing import (
AsyncIterator,
Any,
Callable,
)
from bidict import bidict from bidict import bidict
import trio import trio
@ -170,10 +174,9 @@ async def clear_dark_triggers(
) )
): ):
price = tick.get('price') price = tick.get('price')
ttype = tick['type']
# update to keep new cmds informed # update to keep new cmds informed
book.lasts[sym] = price book.lasts[sym] = price
ttype = tick['type']
for oid, ( for oid, (
pred, pred,
@ -189,65 +192,62 @@ async def clear_dark_triggers(
ttype not in tf or ttype not in tf or
not pred(price) not pred(price)
): ):
log.debug( # log.runtime(
f'skipping quote for {sym} ' # f'skipping quote for {sym} '
f'{pred}, {ttype} not in {tf}?, {pred(price)}' # f'{pred} -> {pred(price)}\n'
) # f'{ttype} not in {tf}?'
# )
# majority of iterations will be non-matches # majority of iterations will be non-matches
continue continue
action: str = cmd['action'] match cmd:
symbol: str = cmd['symbol'] # alert: nothing to do but relay a status
bfqsn: str = symbol.replace(f'.{broker}', '') # back to the requesting ems client
case {
'action': 'alert',
}:
resp = 'alert_triggered'
if action == 'alert': # executable order submission
# nothing to do but relay a status case {
# message back to the requesting ems client 'action': action,
resp = 'alert_triggered' 'symbol': symbol,
'account': account,
'size': size,
}:
bfqsn: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away
resp = 'dark_triggered' # hidden on client-side
else: # executable order submission log.info(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
# submit_price = price + price*percent_away live_req = BrokerdOrder(
submit_price = price + abs_diff_away action=action,
oid=oid,
account=account,
time_ns=time.time_ns(),
symbol=bfqsn,
price=submit_price,
size=size,
)
await brokerd_orders_stream.send(live_req)
log.info( # mark this entry as having sent an order
f'Dark order triggered for price {price}\n' # request. the entry will be replaced once the
f'Submitting order @ price {submit_price}') # target broker replies back with
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
book._ems_entries[oid] = live_req
msg = BrokerdOrder( case _:
action=cmd['action'], raise ValueError(f'Invalid dark book entry: {cmd}')
oid=oid,
account=cmd['account'],
time_ns=time.time_ns(),
# this **creates** new order request for the # fallthrough logic
# underlying broker so we set a "broker resp = Status(
# request id" (``reqid`` kwarg) to ``None`` oid=oid, # ems dialog id
# so that the broker client knows that we
# aren't trying to modify an existing
# order-request and instead create a new one.
reqid=None,
symbol=bfqsn,
price=submit_price,
size=cmd['size'],
)
await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order
# request. the entry will be replaced once the
# target broker replies back with
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
book._ems_entries[oid] = msg
# our internal status value for client-side
# triggered "dark orders"
resp = 'dark_triggered'
msg = Status(
oid=oid, # ems order id
time_ns=time.time_ns(), time_ns=time.time_ns(),
resp=resp, resp=resp,
trigger_price=price, trigger_price=price,
@ -262,13 +262,14 @@ async def clear_dark_triggers(
f'pred for {oid} was already removed!?' f'pred for {oid} was already removed!?'
) )
# send response to client-side
try: try:
await ems_client_order_stream.send(msg) await ems_client_order_stream.send(resp)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
): ):
log.warning( log.warning(
f'client {ems_client_order_stream} stream is broke' f'{ems_client_order_stream} stream broke?'
) )
break break
@ -572,98 +573,57 @@ async def translate_and_relay_brokerd_events(
assert relay.brokerd_dialogue == brokerd_trades_stream assert relay.brokerd_dialogue == brokerd_trades_stream
brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name']
log.info( log.info(
f'Received broker trade event:\n' f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}' f'{pformat(brokerd_msg)}'
) )
match brokerd_msg:
if name == 'position': # BrokerdPosition
case {
'name': 'position',
'symbol': sym,
'broker': broker,
}:
pos_msg = BrokerdPosition(**brokerd_msg)
pos_msg = BrokerdPosition(**brokerd_msg) # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
# sym, broker = pos_msg.symbol, pos_msg.broker
# XXX: this will be useful for automatic strats yah? relay.positions.setdefault(
# keep pps per account up to date locally in ``emsd`` mem # NOTE: translate to a FQSN!
sym, broker = pos_msg.symbol, pos_msg.broker (broker, sym),
[]
).append(pos_msg)
relay.positions.setdefault( # fan-out-relay position msgs immediately by
# NOTE: translate to a FQSN! # broadcasting updates on all client streams
(broker, sym), for client_stream in router.clients.copy():
[] try:
).append(pos_msg) await client_stream.send(pos_msg)
except(
# fan-out-relay position msgs immediately by trio.ClosedResourceError,
# broadcasting updates on all client streams trio.BrokenResourceError,
for client_stream in router.clients.copy(): ):
try: router.clients.remove(client_stream)
await client_stream.send(pos_msg) log.warning(
except( f'client for {client_stream} was already closed?')
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
continue
# Get the broker (order) request id, this **must** be normalized
# into messaging provided by the broker backend
reqid = brokerd_msg['reqid']
# all piker originated requests will have an ems generated oid field
oid = brokerd_msg.get(
'oid',
book._ems2brokerd_ids.inverse.get(reqid)
)
if oid is None:
# XXX: paper clearing special cases
# paper engine race case: ``Client.submit_limit()`` hasn't
# returned yet and provided an output reqid to register
# locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = brokerd_msg['broker_details'].get('paper_info')
ext = brokerd_msg['broker_details'].get('external')
if paper:
# paperboi keeps the ems id up front
oid = paper['oid']
elif ext:
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
log.error(f"External trade event {ext}")
continue continue
else: # BrokerdOrderAck
# something is out of order, we don't have an oid for case {
# this broker-side message. 'name': 'ack',
log.error( 'reqid': reqid, # brokerd generated order-request id
f'Unknown oid: {oid} for msg:\n' 'oid': oid, # ems order-dialog id
f'{pformat(brokerd_msg)}\n' } if (
'Unable to relay message to client side!?' entry := book._ems_entries.get(oid)
) ):
# initial response to brokerd order request
else: # if name == 'ack':
# check for existing live flow entry
entry = book._ems_entries.get(oid)
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# initial response to brokerd order request
if name == 'ack':
# register the brokerd request id (that was generated # register the brokerd request id (that was generated
# / created internally by the broker backend) with our # / created internally by the broker backend) with our
@ -697,91 +657,136 @@ async def translate_and_relay_brokerd_events(
# update the flow with the ack msg # update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
# no msg to client necessary
continue continue
# a live flow now exists # BrokerdOrderError
oid = entry.oid case {
'name': 'error',
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
'broker_details': details,
# 'reason': reason,
}:
msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank?
# TODO: instead this should be our status set. # TODO: figure out how this will interact with EMS clients
# ack, open, fill, closed, cancelled' # for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# This looks like a supervision policy for pending orders on
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
resp = None # BrokerdStatus
broker_details = {} case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
if name in ( } if (
'error', oid := book._ems2brokerd_ids.inverse.get(reqid)
): ):
# TODO: figure out how this will interact with EMS clients msg = BrokerdStatus(**brokerd_msg)
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# This looks like a supervision policy for pending orders on # TODO: should we flatten out these cases and/or should
# some unexpected failure - something we need to think more # they maybe even eventually be separate messages?
# about. In most default situations, with composed orders if status == 'cancelled':
# (ex. brackets), most brokers seem to use a oca policy. log.info(f'Cancellation for {oid} is complete!')
msg = BrokerdError(**brokerd_msg) if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not remaining:
# XXX should we make one when it's blank? resp = 'broker_executed'
log.error(pformat(msg))
# TODO: getting this bs, prolly need to handle status messages # be sure to pop this stream from our dialogue set
# 'Market data farm connection is OK:usfarm.nj' # since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# another stupid ib error to handle # just log it
# if 10147 in message: cancel else:
log.info(f'{broker} filled {msg}')
resp = 'broker_errored'
broker_details = msg
# don't relay message to order requester client
# continue
elif name in (
'status',
):
msg = BrokerdStatus(**brokerd_msg)
if msg.status == 'cancelled':
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not msg.remaining:
resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# just log it
else: else:
log.info(f'{broker} filled {msg}') # one of {submitted, cancelled}
resp = 'broker_' + msg.status
else: # BrokerdFill
# one of {submitted, cancelled} case {
resp = 'broker_' + msg.status 'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# pass the BrokerdStatus msg inside the broker details field # unknown valid message case?
broker_details = msg # case {
# 'name': name,
# 'symbol': sym,
# 'reqid': reqid, # brokerd generated order-request id
# # 'oid': oid, # ems order-dialog id
# 'broker_details': details,
elif name in ( # } if (
'fill', # book._ems2brokerd_ids.inverse.get(reqid) is None
): # ):
msg = BrokerdFill(**brokerd_msg) # # TODO: pretty sure we can drop this now?
# proxy through the "fill" result(s) # # XXX: paper clearing special cases
resp = 'broker_filled' # # paper engine race case: ``Client.submit_limit()`` hasn't
broker_details = msg # # returned yet and provided an output reqid to register
# # locally, so we need to retreive the oid that was already
# # packed at submission since we already know it ahead of
# # time
# paper = details.get('paper_info')
# ext = details.get('external')
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') # if paper:
# # paperboi keeps the ems id up front
# oid = paper['oid']
else: # elif ext:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid') # # may be an order msg specified as "external" to the
# # piker ems flow (i.e. generated by some other
# # external broker backend client (like tws for ib)
# log.error(f"External trade event {name}@{ext}")
# else:
# # something is out of order, we don't have an oid for
# # this broker-side message.
# log.error(
# f'Unknown oid: {oid} for msg {name}:\n'
# f'{pformat(brokerd_msg)}\n'
# 'Unable to relay message to client side!?'
# )
# continue
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
entry = book._ems_entries[oid]
assert entry.oid == oid
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# Create and relay response status message # Create and relay response status message
# to requesting EMS client # to requesting EMS client
@ -793,7 +798,7 @@ async def translate_and_relay_brokerd_events(
resp=resp, resp=resp,
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=msg,
) )
) )
except KeyError: except KeyError:
@ -808,11 +813,11 @@ async def translate_and_relay_brokerd_events(
async def process_client_order_cmds( async def process_client_order_cmds(
client_order_stream: tractor.MsgStream, # noqa client_order_stream: tractor.MsgStream,
brokerd_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream,
symbol: str, symbol: str,
feed: Feed, # noqa feed: Feed,
dark_book: _DarkBook, dark_book: _DarkBook,
router: Router, router: Router,
@ -822,34 +827,24 @@ async def process_client_order_cmds(
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(f'Received order cmd:\n{pformat(cmd)}')
action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id # register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid) live_entry = dark_book._ems_entries.get(oid)
# TODO: can't wait for this stuff to land in 3.10 match cmd:
# https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings # existing live-broker order cancel
if action in ('cancel',): case {
'action': 'cancel',
# check for live-broker order 'oid': oid,
if live_entry: } if live_entry:
reqid = live_entry.reqid reqid = live_entry.reqid
msg = BrokerdCancel( msg = BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
@ -860,12 +855,10 @@ async def process_client_order_cmds(
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
if reqid is not None: if reqid is not None:
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)
else: else:
@ -876,7 +869,10 @@ async def process_client_order_cmds(
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# dark trigger cancel # dark trigger cancel
else: case {
'action': 'cancel',
'oid': oid,
} if not live_entry:
try: try:
# remove from dark book clearing # remove from dark book clearing
dark_book.orders[symbol].pop(oid, None) dark_book.orders[symbol].pop(oid, None)
@ -896,25 +892,27 @@ async def process_client_order_cmds(
except KeyError: except KeyError:
log.exception(f'No dark order for {symbol}?') log.exception(f'No dark order for {symbol}?')
# TODO: 3.10 struct-pattern matching and unpacking here # live order submission
elif action in ('alert', 'buy', 'sell',): case {
'oid': oid,
'symbol': fqsn,
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': 'live',
}:
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
req = Order(**cmd)
broker = req.brokers[0]
msg = Order(**cmd) # remove the broker part before creating a message
# to send to the specific broker since they probably
fqsn = msg.symbol # aren't expectig their own name, but should they?
trigger_price = msg.price sym = fqsn.replace(f'.{broker}', '')
size = msg.size
exec_mode = msg.exec_mode
broker = msg.brokers[0]
# remove the broker part before creating a message
# to send to the specific broker since they probably
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if exec_mode == 'live' and action in ('buy', 'sell',):
if live_entry is not None: if live_entry is not None:
# sanity check on emsd id # sanity check on emsd id
assert live_entry.oid == oid assert live_entry.oid == oid
reqid = live_entry.reqid reqid = live_entry.reqid
@ -934,7 +932,7 @@ async def process_client_order_cmds(
action=action, action=action,
price=trigger_price, price=trigger_price,
size=size, size=size,
account=msg.account, account=req.account,
) )
# send request to backend # send request to backend
@ -954,12 +952,22 @@ async def process_client_order_cmds(
# that live order asap. # that live order asap.
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# "DARK" triggers # dark-order / alert submission
# submit order to local EMS book and scan loop, case {
# effectively a local clearing engine, which 'oid': oid,
# scans for conditions and triggers matching executions 'symbol': fqsn,
elif exec_mode in ('dark', 'paper') or ( 'price': trigger_price,
action in ('alert') 'size': size,
'exec_mode': exec_mode,
'action': action,
'brokers': brokers, # list
} if (
# "DARK" triggers
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper')
or action == 'alert'
): ):
# Auto-gen scanner predicate: # Auto-gen scanner predicate:
# we automatically figure out what the alert check # we automatically figure out what the alert check
@ -977,6 +985,7 @@ async def process_client_order_cmds(
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
sym = fqsn.replace(f'.{brokers[0]}', '')
min_tick = feed.symbols[sym].tick_size min_tick = feed.symbols[sym].tick_size
if action == 'buy': if action == 'buy':
@ -999,10 +1008,8 @@ async def process_client_order_cmds(
abs_diff_away = 0 abs_diff_away = 0
# submit execution/order to EMS scan loop # submit execution/order to EMS scan loop
# NOTE: this may result in an override of an existing # NOTE: this may result in an override of an existing
# dark book entry if the order id already exists # dark book entry if the order id already exists
dark_book.orders.setdefault( dark_book.orders.setdefault(
fqsn, {} fqsn, {}
)[oid] = ( )[oid] = (
@ -1029,17 +1036,16 @@ async def process_client_order_cmds(
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
fqsn: str, fqsn: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str = 'info', loglevel: str = 'info',
) -> None: ) -> None:
'''EMS (sub)actor entrypoint providing the '''
execution management (micro)service which conducts broker EMS (sub)actor entrypoint providing the execution management
order clearing control on behalf of clients. (micro)service which conducts broker order clearing control on
behalf of clients.
This is the daemon (child) side routine which starts an EMS runtime This is the daemon (child) side routine which starts an EMS runtime
task (one per broker-feed) and and begins streaming back alerts from task (one per broker-feed) and and begins streaming back alerts from
@ -1083,9 +1089,8 @@ async def _emsd_main(
# tractor.Context instead of strictly requiring a ctx arg. # tractor.Context instead of strictly requiring a ctx arg.
ems_ctx = ctx ems_ctx = ctx
feed: Feed
# spawn one task per broker feed # spawn one task per broker feed
feed: Feed
async with ( async with (
maybe_open_feed( maybe_open_feed(
[fqsn], [fqsn],

View File

@ -312,50 +312,54 @@ async def simulate_fills(
# this stream may eventually contain multiple symbols # this stream may eventually contain multiple symbols
async for quotes in quote_stream: async for quotes in quote_stream:
for sym, quote in quotes.items(): for sym, quote in quotes.items():
for tick in iterticks( for tick in iterticks(
quote, quote,
# dark order price filter(s) # dark order price filter(s)
types=('ask', 'bid', 'trade', 'last') types=('ask', 'bid', 'trade', 'last')
): ):
# print(tick) # print(tick)
tick_price = tick.get('price') match tick:
ttype = tick['type'] case {
'price': tick_price,
'type': 'ask',
}:
client.last_ask = (
tick_price,
tick.get('size', client.last_ask[1]),
)
if ttype in ('ask',): orders = client._buys.get(sym, {})
book_sequence = reversed(
sorted(orders.keys(), key=itemgetter(1)))
client.last_ask = ( def pred(our_price):
tick_price, return tick_price <= our_price
tick.get('size', client.last_ask[1]),
)
orders = client._buys.get(sym, {}) case {
'price': tick_price,
'type': 'bid',
}:
client.last_bid = (
tick_price,
tick.get('size', client.last_bid[1]),
)
orders = client._sells.get(sym, {})
book_sequence = sorted(
orders.keys(),
key=itemgetter(1)
)
book_sequence = reversed( def pred(our_price):
sorted(orders.keys(), key=itemgetter(1))) return tick_price >= our_price
def pred(our_price): case {
return tick_price < our_price 'price': tick_price,
'type': ('trade' | 'last'),
elif ttype in ('bid',): }:
# TODO: simulate actual book queues and our orders
client.last_bid = ( # place in it, might require full L2 data?
tick_price, continue
tick.get('size', client.last_bid[1]),
)
orders = client._sells.get(sym, {})
book_sequence = sorted(orders.keys(), key=itemgetter(1))
def pred(our_price):
return tick_price > our_price
elif ttype in ('trade', 'last'):
# TODO: simulate actual book queues and our orders
# place in it, might require full L2 data?
continue
# iterate book prices descending # iterate book prices descending
for oid, our_price in book_sequence: for oid, our_price in book_sequence: