Support multi-client order-dialog management

This patch was originally to fix a bug where new clients who
re-connected to an `emsd` that was running a paper engine were not
getting updates from new fills and/or cancels. It turns out the solution
is more general: now, any client that creates a order dialog will be
subscribing to receive updates on the order flow set mapped for that
symbol/instrument as long as the client has registered for that
particular fqsn with the EMS. This means re-connecting clients as well
as "monitoring" clients can see the same orders, alerts, fills and
clears.

Impl details:
- change all var names spelled as `dialogues` -> `dialogs` to be
  murican.
- make `Router.dialogs: dict[str, defaultdict[str, list]]` so that each
  dialog id (oid) maps to a set of potential subscribing ems clients.
- add `Router.fqsn2dialogs: dict[str, list[str]]` a map of fqsn entries to
  sets of oids.
- adjust all core task code to make appropriate lookups into these 2 new
  tables instead of being handed specific client streams as input.
- start the `translate_and_relay_brokerd_events` task as a daemon task
  that lives with the particular `TradesRelay` such that dialogs cleared
  while no client is connected are still processed.
- rename `TradesRelay.brokerd_dialogue` -> `.brokerd_stream`
- broadcast all status msgs to all subscribed clients in the relay loop.
- always de-reg each client stream from the `Router.dialogs` table on close.
multi_client_order_mgt
Tyler Goodlet 2022-09-30 16:28:50 -04:00
parent cf835b97ca
commit 909e068121
1 changed files with 102 additions and 88 deletions

View File

@ -18,7 +18,11 @@
In da suit parlances: "Execution management systems" In da suit parlances: "Execution management systems"
""" """
from collections import defaultdict, ChainMap from __future__ import annotations
from collections import (
defaultdict,
ChainMap,
)
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
@ -134,12 +138,6 @@ class _DarkBook(Struct):
# _ems_entries: dict[str, str] = {} # _ems_entries: dict[str, str] = {}
_active: dict = {} _active: dict = {}
# mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
_ems2brokerd_ids: dict[str, str] = bidict() _ems2brokerd_ids: dict[str, str] = bidict()
@ -152,8 +150,8 @@ _DEFAULT_SIZE: float = 1.0
async def clear_dark_triggers( async def clear_dark_triggers(
router: Router,
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str, broker: str,
fqsn: str, fqsn: str,
@ -288,15 +286,16 @@ async def clear_dark_triggers(
book._active[oid] = status book._active[oid] = status
# send response to client-side # send response to client-side
try: for client_stream in router.dialogs[oid]:
await ems_client_order_stream.send(status) try:
except ( await client_stream.send(status)
trio.ClosedResourceError, except (
): trio.ClosedResourceError,
log.warning( ):
f'{ems_client_order_stream} stream broke?' log.warning(
) f'{client_stream} stream broke?'
break )
break
else: # condition scan loop complete else: # condition scan loop complete
log.debug(f'execs are {execs}') log.debug(f'execs are {execs}')
@ -310,7 +309,7 @@ class TradesRelay(Struct):
# for now we keep only a single connection open with # for now we keep only a single connection open with
# each ``brokerd`` for simplicity. # each ``brokerd`` for simplicity.
brokerd_dialogue: tractor.MsgStream brokerd_stream: tractor.MsgStream
# map of symbols to dicts of accounts to pp msgs # map of symbols to dicts of accounts to pp msgs
positions: dict[ positions: dict[
@ -342,13 +341,28 @@ class Router(Struct):
# order id to client stream map # order id to client stream map
clients: set[tractor.MsgStream] = set() clients: set[tractor.MsgStream] = set()
dialogues: dict[
str,
list[tractor.MsgStream]
] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors fqsn2dialogs: defaultdict[
relays: dict[str, TradesRelay] = {} str, # fqsn
list[str], # oids
] = defaultdict(list)
dialogs: defaultdict[
str, # ems uuid (oid)
list[tractor.MsgStream] # client side msg stream
] = defaultdict(list)
# mapping of ems dialog ids to msg flow history
msgflows: defaultdict[
str,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
# brokername to trades-dialogs streams with ``brokerd`` actors
relays: dict[
str, # broker name
TradesRelay,
] = {}
def get_dark_book( def get_dark_book(
self, self,
@ -373,7 +387,8 @@ class Router(Struct):
none already exists. none already exists.
''' '''
relay: TradesRelay = self.relays.get(feed.mod.name) broker = feed.mod.name
relay: TradesRelay = self.relays.get(broker)
if ( if (
relay is None relay is None
@ -387,7 +402,7 @@ class Router(Struct):
): ):
relay = await self.nursery.start( relay = await self.nursery.start(
open_brokerd_trades_dialogue, open_brokerd_trades_dialog,
self, self,
feed, feed,
symbol, symbol,
@ -395,18 +410,23 @@ class Router(Struct):
loglevel, loglevel,
) )
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
self,
)
relay.consumers += 1 relay.consumers += 1
# TODO: get updated positions here? # TODO: get updated positions here?
assert relay.brokerd_dialogue assert relay.brokerd_stream
try: try:
yield relay yield relay
finally: finally:
# TODO: what exactly needs to be torn down here or # TODO: what exactly needs to be torn down here or
# are we just consumer tracking? # are we just consumer tracking?
relay.consumers -= 1 relay.consumers -= 1
async def client_broadcast( async def client_broadcast(
@ -429,7 +449,7 @@ class Router(Struct):
_router: Router = None _router: Router = None
async def open_brokerd_trades_dialogue( async def open_brokerd_trades_dialog(
router: Router, router: Router,
feed: Feed, feed: Feed,
@ -505,7 +525,7 @@ async def open_brokerd_trades_dialogue(
# we cache the relay task and instead of running multiple # we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being # tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client # relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues # stream to this single relay loop in the dialog table.
# begin processing order events from the target brokerd backend # begin processing order events from the target brokerd backend
# by receiving order submission response messages, # by receiving order submission response messages,
@ -532,7 +552,7 @@ async def open_brokerd_trades_dialogue(
).append(msg) ).append(msg)
relay = TradesRelay( relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream, brokerd_stream=brokerd_trades_stream,
positions=pps, positions=pps,
accounts=accounts, accounts=accounts,
consumers=1, consumers=1,
@ -550,8 +570,8 @@ async def open_brokerd_trades_dialogue(
await trio.sleep_forever() await trio.sleep_forever()
finally: finally:
# parent context must have been closed # parent context must have been closed remove from cache so
# remove from cache so next client will respawn if needed # next client will respawn if needed
relay = _router.relays.pop(broker, None) relay = _router.relays.pop(broker, None)
if not relay: if not relay:
log.warning(f'Relay for {broker} was already removed!?') log.warning(f'Relay for {broker} was already removed!?')
@ -608,7 +628,7 @@ async def translate_and_relay_brokerd_events(
book: _DarkBook = router.get_dark_book(broker) book: _DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker] relay: TradesRelay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream assert relay.brokerd_stream == brokerd_trades_stream
brokerd_msg: dict[str, Any] brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
@ -707,11 +727,14 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more # some unexpected failure - something we need to think more
# about. In most default situations, with composed orders # about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy. # (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_stream = router.dialogues[oid] ems_client_order_streams = router.dialogs[oid]
status_msg.resp = 'error' status_msg.resp = 'error'
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
book._active[oid] = status_msg book._active[oid] = status_msg
await ems_client_order_stream.send(status_msg)
for stream in ems_client_order_streams:
await stream.send(status_msg)
# BrokerdStatus # BrokerdStatus
case { case {
@ -732,15 +755,15 @@ async def translate_and_relay_brokerd_events(
# TODO: maybe pack this into a composite type that # TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the # contains both the IPC stream as well the
# msg-chain/dialog. # msg-chain/dialog.
ems_client_order_stream = router.dialogues.get(oid) ems_client_order_streams = router.dialogs[oid]
status_msg = book._active.get(oid) status_msg = book._active.get(oid)
if ( if (
not ems_client_order_stream not ems_client_order_streams
or not status_msg or not status_msg
): ):
log.warning( log.warning(
f'Received status for unknown dialog {oid}:\n' f'Received status for untracked dialog {oid}:\n'
f'{fmsg}' f'{fmsg}'
) )
continue continue
@ -759,7 +782,9 @@ async def translate_and_relay_brokerd_events(
status_msg.reqid = reqid # THIS LINE IS CRITICAL! status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
await ems_client_order_stream.send(status_msg)
for stream in ems_client_order_streams:
await stream.send(status_msg)
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
@ -793,7 +818,7 @@ async def translate_and_relay_brokerd_events(
msg = BrokerdFill(**brokerd_msg) msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{fmsg}') log.info(f'Fill for {oid} cleared with:\n{fmsg}')
ems_client_order_stream = router.dialogues[oid] ems_client_order_streams = router.dialogs[oid]
# XXX: bleh, a fill can come after 'closed' from `ib`? # XXX: bleh, a fill can come after 'closed' from `ib`?
# only send a late fill event we haven't already closed # only send a late fill event we haven't already closed
@ -803,7 +828,10 @@ async def translate_and_relay_brokerd_events(
status_msg.resp = 'fill' status_msg.resp = 'fill'
status_msg.reqid = reqid status_msg.reqid = reqid
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
for stream in ems_client_order_streams:
await stream.send(status_msg)
# await ems_client_order_stream.send(status_msg)
# ``Status`` containing an embedded order msg which # ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the # should be loaded as a "pre-existing open order" from the
@ -903,11 +931,6 @@ async def translate_and_relay_brokerd_events(
# if status_msg is not None: # if status_msg is not None:
# del status_msg # del status_msg
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# router.dialogues.pop(oid)
async def process_client_order_cmds( async def process_client_order_cmds(
@ -921,7 +944,7 @@ async def process_client_order_cmds(
) -> None: ) -> None:
client_dialogues = router.dialogues client_dialogs = router.dialogs
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
@ -934,7 +957,11 @@ async def process_client_order_cmds(
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream subs = client_dialogs[oid]
if client_order_stream not in subs:
subs.append(client_order_stream)
router.fqsn2dialogs[symbol].append(oid)
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
# any dark/live status which is current # any dark/live status which is current
@ -1002,8 +1029,9 @@ async def process_client_order_cmds(
status.req = cmd status.req = cmd
await client_order_stream.send(status) await client_order_stream.send(status)
# de-register this client dialogue # de-register this order dialogue from all clients
router.dialogues.pop(oid) router.dialogs[oid].clear()
router.dialogs.pop(oid)
dark_book._active.pop(oid) dark_book._active.pop(oid)
else: else:
@ -1034,8 +1062,8 @@ async def process_client_order_cmds(
if status is not None: if status is not None:
# if we already had a broker order id then # if we already had a broker order id then
# this is likely an order update commmand. # this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}")
reqid = status.reqid reqid = status.reqid
log.info(f"Modifying live {broker} order: {reqid}")
status.req = req status.req = req
status.resp = 'pending' status.resp = 'pending'
@ -1252,11 +1280,10 @@ async def _emsd_main(
loglevel, loglevel,
) as relay, ) as relay,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
brokerd_stream = relay.brokerd_dialogue # .clone() brokerd_stream = relay.brokerd_stream
# signal to client that we're started and deliver # signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``. # all known pps and accounts for this ``brokerd``.
@ -1268,26 +1295,20 @@ async def _emsd_main(
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream: async with ems_ctx.open_stream() as client_stream:
# register the client side before startingn the # register the client side before starting the
# brokerd-side relay task to ensure the client is # brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup. # delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream) _router.clients.add(client_stream)
for oid in _router.fqsn2dialogs[fqsn]:
n.start_soon( _router.dialogs[oid].append(client_stream)
translate_and_relay_brokerd_events,
broker,
brokerd_stream,
_router,
)
# trigger scan and exec loop # trigger scan and exec loop
n.start_soon( n.start_soon(
clear_dark_triggers, clear_dark_triggers,
_router,
brokerd_stream, brokerd_stream,
ems_client_order_stream,
quote_stream, quote_stream,
broker, broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker> fqsn, # form: <name>.<venue>.<suffix>.<broker>
@ -1295,16 +1316,11 @@ async def _emsd_main(
) )
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled.
try: try:
# main entrypoint, run here until cancelled.
await process_client_order_cmds( await process_client_order_cmds(
client_stream,
ems_client_order_stream,
# relay.brokerd_dialogue,
brokerd_stream, brokerd_stream,
fqsn, fqsn,
feed, feed,
dark_book, dark_book,
@ -1314,28 +1330,26 @@ async def _emsd_main(
finally: finally:
# try to remove client from "registry" # try to remove client from "registry"
try: try:
_router.clients.remove(ems_client_order_stream) _router.clients.remove(client_stream)
except KeyError: except KeyError:
log.warning( log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}' f'Stream {client_stream._ctx.chan.uid}'
' was already dropped?' ' was already dropped?'
) )
dialogues = _router.dialogues dialogs = _router.dialogs
for oid, client_streams in dialogs.items():
if client_stream in client_streams:
client_streams.remove(client_stream)
for oid, client_stream in dialogues.copy().items(): # TODO: for order dialogs left "alive" in
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some # the ems this is where we should allow some
# system to take over management. Likely we # system to take over management. Likely we
# want to allow the user to choose what kind # want to allow the user to choose what kind
# of policy to use (eg. cancel all orders # of policy to use (eg. cancel all orders
# from client, run some algo, etc.) # from client, run some algo, etc.)
if not client_streams:
log.warning(
f'Order dialog is being unmonitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)