Add pub-sub broadcasting

Establishes a more formalized subscription based fan out pattern to ems
clients who subscribe for order flow for a particular symbol (the fqsn
is the default subscription key for now).

Make `Router.client_broadcast()` take a `sub_key: str` value which
determines the set of clients to forward a message to and drop all such
manually defined broadcast loops from task (func) code. Also add
`.get_subs()` which (hackily) allows getting the set of clients for
a given sub key where any stream that is detected as "closed" is
discarded in the output. Further we simplify to `Router.dialogs:
defaultdict[str, set[tractor.MsgStream]]` and `.subscriptions` as maps
to sets of streams for much easier broadcast management/logic using set
operations inside `.client_broadcast()`.
multi_client_order_mgt
Tyler Goodlet 2022-10-03 12:54:10 -04:00
parent 909e068121
commit 4877af9bc3
1 changed files with 113 additions and 67 deletions

View File

@ -21,7 +21,7 @@ In da suit parlances: "Execution management systems"
from __future__ import annotations from __future__ import annotations
from collections import ( from collections import (
defaultdict, defaultdict,
ChainMap, # ChainMap,
) )
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from math import isnan from math import isnan
@ -286,16 +286,10 @@ async def clear_dark_triggers(
book._active[oid] = status book._active[oid] = status
# send response to client-side # send response to client-side
for client_stream in router.dialogs[oid]: await router.client_broadcast(
try: fqsn,
await client_stream.send(status) status,
except ( )
trio.ClosedResourceError,
):
log.warning(
f'{client_stream} stream broke?'
)
break
else: # condition scan loop complete else: # condition scan loop complete
log.debug(f'execs are {execs}') log.debug(f'execs are {execs}')
@ -342,21 +336,24 @@ class Router(Struct):
# order id to client stream map # order id to client stream map
clients: set[tractor.MsgStream] = set() clients: set[tractor.MsgStream] = set()
fqsn2dialogs: defaultdict[ # sets of clients mapped from subscription keys
str, # fqsn subscribers: defaultdict[
list[str], # oids str, # sub key, default fqsn
] = defaultdict(list) set[tractor.MsgStream], # unique client streams
] = defaultdict(set)
# sets of clients dynamically registered for specific
# order flows based on subscription config.
dialogs: defaultdict[ dialogs: defaultdict[
str, # ems uuid (oid) str, # ems uuid (oid)
list[tractor.MsgStream] # client side msg stream set[tractor.MsgStream] # client side msg stream
] = defaultdict(list) ] = defaultdict(set)
# mapping of ems dialog ids to msg flow history # TODO: mapping of ems dialog ids to msg flow history
msgflows: defaultdict[ # msgflows: defaultdict[
str, # str,
ChainMap[dict[str, dict]], # ChainMap[dict[str, dict]],
] = defaultdict(ChainMap) # ] = defaultdict(ChainMap)
# brokername to trades-dialogs streams with ``brokerd`` actors # brokername to trades-dialogs streams with ``brokerd`` actors
relays: dict[ relays: dict[
@ -372,6 +369,20 @@ class Router(Struct):
return self.books.setdefault(brokername, _DarkBook(brokername)) return self.books.setdefault(brokername, _DarkBook(brokername))
def get_subs(
self,
oid: str,
) -> set[tractor.MsgStream]:
'''
Deliver list of non-closed subscriber client msg streams.
'''
return set(
stream for stream in self.dialogs[oid]
if not stream._closed
)
@asynccontextmanager @asynccontextmanager
async def maybe_open_brokerd_trades_dialogue( async def maybe_open_brokerd_trades_dialogue(
self, self,
@ -431,20 +442,27 @@ class Router(Struct):
async def client_broadcast( async def client_broadcast(
self, self,
sub_key: str,
msg: dict, msg: dict,
) -> None: ) -> None:
for client_stream in self.clients.copy(): to_remove: set[tractor.MsgStream] = set()
subs = self.subscribers[sub_key]
for client_stream in subs:
try: try:
await client_stream.send(msg) await client_stream.send(msg)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,
): ):
to_remove.add(client_stream)
self.clients.remove(client_stream) self.clients.remove(client_stream)
log.warning( log.warning(
f'client for {client_stream} was already closed?') f'client for {client_stream} was already closed?')
if to_remove:
subs.difference_update(to_remove)
_router: Router = None _router: Router = None
@ -558,7 +576,7 @@ async def open_brokerd_trades_dialog(
consumers=1, consumers=1,
) )
_router.relays[broker] = relay router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we # the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless # want to keep the ``brokerd`` dialogue up regardless
@ -572,7 +590,7 @@ async def open_brokerd_trades_dialog(
finally: finally:
# parent context must have been closed remove from cache so # parent context must have been closed remove from cache so
# next client will respawn if needed # next client will respawn if needed
relay = _router.relays.pop(broker, None) relay = router.relays.pop(broker, None)
if not relay: if not relay:
log.warning(f'Relay for {broker} was already removed!?') log.warning(f'Relay for {broker} was already removed!?')
@ -627,7 +645,6 @@ async def translate_and_relay_brokerd_events(
''' '''
book: _DarkBook = router.get_dark_book(broker) book: _DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker] relay: TradesRelay = router.relays[broker]
assert relay.brokerd_stream == brokerd_trades_stream assert relay.brokerd_stream == brokerd_trades_stream
brokerd_msg: dict[str, Any] brokerd_msg: dict[str, Any]
@ -660,7 +677,7 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
await router.client_broadcast(pos_msg) await router.client_broadcast(sym, pos_msg)
continue continue
# BrokerdOrderAck # BrokerdOrderAck
@ -727,21 +744,18 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more # some unexpected failure - something we need to think more
# about. In most default situations, with composed orders # about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy. # (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_streams = router.dialogs[oid]
status_msg.resp = 'error' status_msg.resp = 'error'
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
book._active[oid] = status_msg book._active[oid] = status_msg
for stream in ems_client_order_streams: await router.client_broadcast(sym, status_msg)
await stream.send(status_msg)
# BrokerdStatus # BrokerdStatus
case { case {
'name': 'status', 'name': 'status',
'status': status, 'status': status,
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
} if ( } if (
(oid := book._ems2brokerd_ids.inverse.get(reqid)) (oid := book._ems2brokerd_ids.inverse.get(reqid))
and status in ( and status in (
@ -755,7 +769,7 @@ async def translate_and_relay_brokerd_events(
# TODO: maybe pack this into a composite type that # TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the # contains both the IPC stream as well the
# msg-chain/dialog. # msg-chain/dialog.
ems_client_order_streams = router.dialogs[oid] ems_client_order_streams = router.get_subs(oid)
status_msg = book._active.get(oid) status_msg = book._active.get(oid)
if ( if (
@ -783,8 +797,10 @@ async def translate_and_relay_brokerd_events(
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name'] status_msg.src = msg.broker_details['name']
for stream in ems_client_order_streams: await router.client_broadcast(
await stream.send(status_msg) status_msg.req.symbol,
status_msg,
)
if status == 'closed': if status == 'closed':
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
@ -818,8 +834,6 @@ async def translate_and_relay_brokerd_events(
msg = BrokerdFill(**brokerd_msg) msg = BrokerdFill(**brokerd_msg)
log.info(f'Fill for {oid} cleared with:\n{fmsg}') log.info(f'Fill for {oid} cleared with:\n{fmsg}')
ems_client_order_streams = router.dialogs[oid]
# XXX: bleh, a fill can come after 'closed' from `ib`? # XXX: bleh, a fill can come after 'closed' from `ib`?
# only send a late fill event we haven't already closed # only send a late fill event we haven't already closed
# out the dialog status locally. # out the dialog status locally.
@ -829,9 +843,10 @@ async def translate_and_relay_brokerd_events(
status_msg.reqid = reqid status_msg.reqid = reqid
status_msg.brokerd_msg = msg status_msg.brokerd_msg = msg
for stream in ems_client_order_streams: await router.client_broadcast(
await stream.send(status_msg) status_msg.req.symbol,
# await ems_client_order_stream.send(status_msg) status_msg,
)
# ``Status`` containing an embedded order msg which # ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the # should be loaded as a "pre-existing open order" from the
@ -883,7 +898,10 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
await router.client_broadcast(status_msg) await router.client_broadcast(
order.symbol,
status_msg,
)
# don't fall through # don't fall through
continue continue
@ -937,15 +955,21 @@ async def process_client_order_cmds(
client_order_stream: tractor.MsgStream, client_order_stream: tractor.MsgStream,
brokerd_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream,
symbol: str, fqsn: str,
feed: Feed, feed: Feed,
dark_book: _DarkBook, dark_book: _DarkBook,
router: Router, router: Router,
) -> None: ) -> None:
'''
Client-dialog request loop: accept order requests and deliver
initial status msg responses to subscribed clients.
client_dialogs = router.dialogs This task-loop handles both management of dark triggered orders and
alerts by inserting them into the "dark book"-table as well as
submitting live orders immediately if requested by the client.
'''
# cmd: dict # cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(f'Received order cmd:\n{pformat(cmd)}')
@ -953,15 +977,17 @@ async def process_client_order_cmds(
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
oid = str(cmd['oid']) oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id # register this stream as an active order dialog (msg flow) for
# such that translated message from the brokerd backend can be # this order id such that translated message from the brokerd
# routed (relayed) to **just** that client stream (and in theory # backend can be routed and relayed to subscribed clients.
# others who are registered for such order affiliated msgs). subs = router.dialogs[oid]
subs = client_dialogs[oid]
if client_order_stream not in subs: # add all subscribed clients for this fqsn (should eventually be
subs.append(client_order_stream) # a more generalize subscription system) to received order msg
# updates (and thus show stuff in the UI).
subs.add(client_order_stream)
subs.update(router.subscribers[fqsn])
router.fqsn2dialogs[symbol].append(oid)
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
# any dark/live status which is current # any dark/live status which is current
@ -973,7 +999,7 @@ async def process_client_order_cmds(
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if ( } if (
(status := dark_book._active.get(oid)) status
and status.resp in ('open', 'pending') and status.resp in ('open', 'pending')
): ):
reqid = status.reqid reqid = status.reqid
@ -1009,11 +1035,11 @@ async def process_client_order_cmds(
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if ( } if (
status and status.resp == 'dark_open' status
# or status and status.req and status.resp == 'dark_open'
): ):
# remove from dark book clearing # remove from dark book clearing
entry = dark_book.orders[symbol].pop(oid, None) entry = dark_book.orders[fqsn].pop(oid, None)
if entry: if entry:
( (
pred, pred,
@ -1028,14 +1054,18 @@ async def process_client_order_cmds(
status.resp = 'canceled' status.resp = 'canceled'
status.req = cmd status.req = cmd
await client_order_stream.send(status) await router.client_broadcast(
fqsn,
status,
)
# de-register this order dialogue from all clients # de-register this order dialogue from all clients
router.dialogs[oid].clear() router.dialogs[oid].clear()
router.dialogs.pop(oid) router.dialogs.pop(oid)
dark_book._active.pop(oid) dark_book._active.pop(oid)
else: else:
log.exception(f'No dark order for {symbol}?') log.exception(f'No dark order for {fqsn}?')
# TODO: eventually we should be receiving # TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol # this struct on the wire unpacked in a scoped protocol
@ -1194,7 +1224,12 @@ async def process_client_order_cmds(
src='dark', src='dark',
) )
dark_book._active[oid] = status dark_book._active[oid] = status
await client_order_stream.send(status)
# broadcast status to all subscribed clients
await router.client_broadcast(
fqsn,
status,
)
@tractor.context @tractor.context
@ -1220,20 +1255,26 @@ async def _emsd_main(
received in a stream from that client actor and then responses are received in a stream from that client actor and then responses are
streamed back up to the original calling task in the same client. streamed back up to the original calling task in the same client.
The primary ``emsd`` task tree is: The primary ``emsd`` task trees are:
- ``_emsd_main()``: - ``_setup_persistent_emsd()``:
sets up brokerd feed, order feed with ems client, trades dialogue with is the ``emsd`` actor's primary root task which sets up an
brokderd trading api. actor-global ``Router`` instance and starts a relay loop task
| which lives until the backend broker is shutdown or the ems is
- ``clear_dark_triggers()``: terminated.
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
| |
- (maybe) ``translate_and_relay_brokerd_events()``: - (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker. reponse" proxy-broker.
- ``_emsd_main()``:
attaches a brokerd real-time quote feed and trades dialogue with
brokderd trading api for every connecting client.
|
- ``clear_dark_triggers()``:
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
| |
- ``process_client_order_cmds()``: - ``process_client_order_cmds()``:
accepts order cmds from requesting clients, registers dark orders and accepts order cmds from requesting clients, registers dark orders and
@ -1301,8 +1342,12 @@ async def _emsd_main(
# brokerd-side relay task to ensure the client is # brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup. # delivered all exisiting open orders on startup.
_router.clients.add(client_stream) _router.clients.add(client_stream)
for oid in _router.fqsn2dialogs[fqsn]:
_router.dialogs[oid].append(client_stream) # TODO: instead of by fqsn we need a subscription
# system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow
# updates per dialog.
_router.subscribers[fqsn].add(client_stream)
# trigger scan and exec loop # trigger scan and exec loop
n.start_soon( n.start_soon(
@ -1337,6 +1382,7 @@ async def _emsd_main(
' was already dropped?' ' was already dropped?'
) )
_router.subscribers[fqsn].remove(client_stream)
dialogs = _router.dialogs dialogs = _router.dialogs
for oid, client_streams in dialogs.items(): for oid, client_streams in dialogs.items():
if client_stream in client_streams: if client_stream in client_streams: