Avoid multiple `brokerd` trades dialogue flows

This solves a bunch of issues to do with `brokerd` order status msgs
getting relayed for each order to **every** correspondingly connected
EMS client. Previously we weren't keeping track of which emsd orders
were associated with which clients so you had backend msgs getting
broadcast to all clients which not only resulted in duplicate (and
sometimes erroneous, due to state tracking) actions taking place in the
UI's order mode, but it's also just duplicate traffic (usually to the
same actor) over multiple logical streams. Instead, only keep up **one**
(cached) stream with the `trades_dialogue()` endpoint such that **all**
emsd orders route over that single connection to the particular
`brokerd` actor.
minimal_brokerd_trade_dialogues
Tyler Goodlet 2021-06-22 07:48:31 -04:00
parent 0675b1fb10
commit a7e106be96
1 changed files with 229 additions and 143 deletions

View File

@ -18,9 +18,10 @@
In da suit parlances: "Execution management systems" In da suit parlances: "Execution management systems"
""" """
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from pprint import pformat from pprint import pformat
import time import time
from dataclasses import dataclass, field
from typing import AsyncIterator, Callable, Any from typing import AsyncIterator, Callable, Any
from bidict import bidict from bidict import bidict
@ -31,6 +32,7 @@ import tractor
from .. import data from .. import data
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Status, Order, Status, Order,
@ -254,30 +256,72 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
# TODO: lots of cases still to handle class _Router(BaseModel):
# XXX: right now this is very very ad-hoc to IB '''Order router which manages per-broker dark books, alerts,
# - short-sale but securities haven't been located, in this case we and clearing related data feed management.
# should probably keep the order in some kind of weird state or cancel
# it outright? '''
# status='PendingSubmit', message=''), nursery: trio.Nursery
# status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'), feeds: dict[tuple[str, str], Any] = {}
# status='PreSubmitted', message='')], books: dict[str, _DarkBook] = {}
dialogues: dict[str, list[tractor.MsgStream]] = {}
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book(
self,
brokername: str,
) -> _DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername))
_router: _Router = None
@tractor.context
async def _setup_persistent_emsd(
ctx: tractor.Context,
) -> None:
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
async def translate_and_relay_brokerd_events( async def translate_and_relay_brokerd_events(
broker: str, broker: str,
ems_client_order_stream: tractor.MsgStream, # ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook, book: _DarkBook,
router: _Router,
) -> AsyncIterator[dict]: ) -> AsyncIterator[dict]:
"""Trades update loop - receive updates from broker, convert '''Trades update loop - receive updates from ``brokerd`` trades
to EMS responses, transmit to ordering client(s). endpoint, convert to EMS response msgs, transmit **only** to
ordering client(s).
This is where trade confirmations from the broker are processed This is where trade confirmations from the broker are processed and
and appropriate responses relayed back to the original EMS client appropriate responses relayed **only** back to the original EMS
actor. There is a messaging translation layer throughout. client actor. There is a messaging translation layer throughout.
Expected message translation(s): Expected message translation(s):
@ -286,10 +330,10 @@ async def translate_and_relay_brokerd_events(
'status' -> relabel as 'broker_<status>', if complete send 'executed' 'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled' 'fill' -> 'broker_filled'
Currently accepted status values from IB: Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
""" '''
async for brokerd_msg in brokerd_trades_stream: async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name'] name = brokerd_msg['name']
@ -298,10 +342,14 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
# relay through position msgs immediately # relay through position msgs immediately by
await ems_client_order_stream.send( # broadcasting updates on all client streams
BrokerdPosition(**brokerd_msg).dict() for oid, ems_client_order_stream in router.dialogues.items():
)
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
continue continue
# Get the broker (order) request id, this **must** be normalized # Get the broker (order) request id, this **must** be normalized
@ -331,7 +379,7 @@ async def translate_and_relay_brokerd_events(
# may be an order msg specified as "external" to the # may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other # piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib) # external broker backend client (like tws for ib)
ext = brokerd_msg.get('external') ext = brokerd_msg['broker_details'].get('external')
if ext: if ext:
log.error(f"External trade event {ext}") log.error(f"External trade event {ext}")
@ -377,6 +425,7 @@ async def translate_and_relay_brokerd_events(
resp = None resp = None
broker_details = {} broker_details = {}
client_flow_complete: bool = False
if name in ( if name in (
'error', 'error',
@ -407,22 +456,13 @@ async def translate_and_relay_brokerd_events(
elif name in ( elif name in (
'status', 'status',
): ):
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
msg = BrokerdStatus(**brokerd_msg) msg = BrokerdStatus(**brokerd_msg)
if msg.status == 'cancelled':
client_flow_complete = True
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled': if msg.status == 'filled':
# conditional execution is fully complete, no more # conditional execution is fully complete, no more
@ -431,6 +471,9 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_executed' resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
client_flow_complete = True
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
# just log it # just log it
@ -460,6 +503,7 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message # Create and relay response status message
# to requesting EMS client # to requesting EMS client
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send( await ems_client_order_stream.send(
Status( Status(
oid=oid, oid=oid,
@ -470,6 +514,9 @@ async def translate_and_relay_brokerd_events(
).dict() ).dict()
) )
if client_flow_complete:
router.dialogues.pop(oid)
async def process_client_order_cmds( async def process_client_order_cmds(
@ -477,8 +524,9 @@ async def process_client_order_cmds(
brokerd_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream,
symbol: str, symbol: str,
feed: 'Feed', # noqa feed: Feed, # noqa
dark_book: _DarkBook, dark_book: _DarkBook,
router: _Router,
) -> None: ) -> None:
@ -489,6 +537,16 @@ async def process_client_order_cmds(
action = cmd['action'] action = cmd['action']
oid = cmd['oid'] oid = cmd['oid']
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
router.dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid) live_entry = dark_book._ems_entries.get(oid)
@ -499,14 +557,17 @@ async def process_client_order_cmds(
# check for live-broker order # check for live-broker order
if live_entry: if live_entry:
reqid = reqid or live_entry.reqid
assert reqid
msg = BrokerdCancel( msg = BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid or live_entry.reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
# send cancel to brokerd immediately! # send cancel to brokerd immediately!
log.info("Submitting cancel for live order") log.info("Submitting cancel for live order {reqid}")
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
# from corresponding broker # from corresponding broker
@ -515,7 +576,8 @@ async def process_client_order_cmds(
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when # acked yet by a brokerd, so register a cancel for when
# the order ack does show up later # the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# check for EMS active exec # check for EMS active exec
@ -532,6 +594,8 @@ async def process_client_order_cmds(
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() ).dict()
) )
# de-register this client dialogue
router.dialogues.pop(oid)
except KeyError: except KeyError:
log.exception(f'No dark order for {symbol}?') log.exception(f'No dark order for {symbol}?')
@ -581,17 +645,22 @@ async def process_client_order_cmds(
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg.dict())
# an immediate response should be brokerd ack with order # an immediate response should be ``BrokerdOrderAck``
# id but we register our request as part of the flow # with ems order id from the ``trades_dialogue()``
# endpoint, but we register our request as part of the
# flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that order with them immediately.
dark_book._ems_entries[oid] = msg dark_book._ems_entries[oid] = msg
# "DARK" triggers
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
elif exec_mode in ('dark', 'paper') or ( elif exec_mode in ('dark', 'paper') or (
action in ('alert') action in ('alert')
): ):
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
# Auto-gen scanner predicate: # Auto-gen scanner predicate:
# we automatically figure out what the alert check # we automatically figure out what the alert check
# condition should be based on the current first # condition should be based on the current first
@ -637,11 +706,11 @@ async def process_client_order_cmds(
percent_away, percent_away,
abs_diff_away abs_diff_away
) )
resp = 'dark_submitted'
# alerts have special msgs to distinguish
if action == 'alert': if action == 'alert':
resp = 'alert_submitted' resp = 'alert_submitted'
else:
resp = 'dark_submitted'
await client_order_stream.send( await client_order_stream.send(
Status( Status(
@ -652,6 +721,97 @@ async def process_client_order_cmds(
) )
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
broker: str,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if broker in _router.relays:
positions, brokerd_trades_stream = _router.relays[broker]
# TODO: get updated positions here?
yield positions, brokerd_trades_stream
return
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
trio.open_nursery() as n,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
# ems_client_order_stream,
brokerd_trades_stream,
dark_book,
_router,
)
_router.relays[broker] = (positions, brokerd_trades_stream)
yield positions, brokerd_trades_stream
@tractor.context @tractor.context
async def _emsd_main( async def _emsd_main(
@ -697,6 +857,8 @@ async def _emsd_main(
''' '''
global _router global _router
assert _router
dark_book = _router.get_dark_book(broker) dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg, # TODO: would be nice if in tractor we can require either a ctx arg,
@ -711,8 +873,6 @@ async def _emsd_main(
# spawn one task per broker feed # spawn one task per broker feed
async with ( async with (
trio.open_nursery() as n,
# TODO: eventually support N-brokers # TODO: eventually support N-brokers
data.open_feed( data.open_feed(
broker, broker,
@ -732,39 +892,24 @@ async def _emsd_main(
book = _router.get_dark_book(broker) book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last'] book.lasts[(broker, symbol)] = first_quote[symbol]['last']
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with ( async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream, # only open if one isn't already up: we try to keep
# as few duplicate streams as necessary
maybe_open_brokerd_trades_dialogue(
_router,
feed,
broker,
symbol,
dark_book,
_exec_mode,
loglevel,
) as (positions, brokerd_trades_stream),
trio.open_nursery() as n,
): ):
# signal to client that we're started # signal to client that we're started
# TODO: we could eventually send back **all** brokerd # TODO: we could eventually send back **all** brokerd
# positions here? # positions here?
@ -787,72 +932,13 @@ async def _emsd_main(
book book
) )
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
ems_client_order_stream,
brokerd_trades_stream,
dark_book,
)
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
await process_client_order_cmds( await process_client_order_cmds(
ems_client_order_stream, ems_client_order_stream,
brokerd_trades_stream, brokerd_trades_stream,
symbol, symbol,
feed, feed,
dark_book, dark_book,
_router,
) )
class _Router(BaseModel):
'''Order router which manages per-broker dark books, alerts,
and clearing related data feed management.
'''
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
books: dict[str, _DarkBook] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book(
self,
brokername: str,
) -> _DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername))
_router: _Router = None
@tractor.context
async def _setup_persistent_emsd(
ctx: tractor.Context,
) -> None:
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()