Repair already-open order relay, fix causality dilemma

With the refactor of the dark loop into a daemon task already-open order
relaying from a `brokerd` was broken since no subscribed clients were
registered prior to the relay loop sending status msgs for such existing
live orders. Repair that by adding one more synchronization phase to the
`Router.open_trade_relays()` task: deliver a `client_ready: trio.Event`
which is set by the client task once the client stream has been
established and don't start the `brokerd` order dialog relay loop until
this event is ready.

Further implementation deats:
- factor the `brokerd` relay caching back into it's own `@acm` method:
  `maybe_open_brokerd_dialog()` since we do want (but only this) stream
  singleton-cached per broker backend.
- spawn all relay tasks on every entry for the moment until we figure
  out what we're caching against (any client pre-existing right, which
  would mean there's an entry in the `.subscribers` table?)
- rename `_DarkBook` -> `DarkBook` and `DarkBook.orders` -> `.triggers`
offline_dark_clearing
Tyler Goodlet 2022-10-06 12:39:27 -04:00
parent 1d9ab7b0de
commit 2bc25e3593
1 changed files with 190 additions and 159 deletions

View File

@ -23,6 +23,7 @@ from collections import (
defaultdict, defaultdict,
# ChainMap, # ChainMap,
) )
from contextlib import asynccontextmanager as acm
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
import time import time
@ -40,6 +41,10 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._source import (
unpack_fqsn,
mk_fqsn,
)
from ..data.feed import ( from ..data.feed import (
Feed, Feed,
maybe_open_feed, maybe_open_feed,
@ -104,7 +109,7 @@ def mk_check(
) )
class _DarkBook(Struct): class DarkBook(Struct):
''' '''
EMS-trigger execution book. EMS-trigger execution book.
@ -119,7 +124,7 @@ class _DarkBook(Struct):
broker: str broker: str
# levels which have an executable action (eg. alert, order, signal) # levels which have an executable action (eg. alert, order, signal)
orders: dict[ triggers: dict[
str, # symbol str, # symbol
dict[ dict[
str, # uuid str, # uuid
@ -131,14 +136,8 @@ class _DarkBook(Struct):
] ]
] = {} ] = {}
# tracks most recent values per symbol each from data feed lasts: dict[str, float] = {} # quote prices
lasts: dict[ _active: dict[str, Status] = {} # active order dialogs
str,
float,
] = {}
_active: dict = {}
_ems2brokerd_ids: dict[str, str] = bidict() _ems2brokerd_ids: dict[str, str] = bidict()
@ -157,7 +156,7 @@ async def clear_dark_triggers(
broker: str, broker: str,
fqsn: str, fqsn: str,
book: _DarkBook, book: DarkBook,
) -> None: ) -> None:
''' '''
@ -174,7 +173,7 @@ async def clear_dark_triggers(
async for quotes in quote_stream: async for quotes in quote_stream:
# start = time.time() # start = time.time()
for sym, quote in quotes.items(): for sym, quote in quotes.items():
execs = book.orders.get(sym, {}) execs = book.triggers.get(sym, {})
for tick in iterticks( for tick in iterticks(
quote, quote,
# dark order price filter(s) # dark order price filter(s)
@ -292,7 +291,7 @@ async def clear_dark_triggers(
else: # condition scan loop complete else: # condition scan loop complete
log.debug(f'execs are {execs}') log.debug(f'execs are {execs}')
if execs: if execs:
book.orders[fqsn] = execs book.triggers[fqsn] = execs
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@ -329,7 +328,7 @@ class Router(Struct):
nursery: trio.Nursery nursery: trio.Nursery
# broker to book map # broker to book map
books: dict[str, _DarkBook] = {} books: dict[str, DarkBook] = {}
# sets of clients mapped from subscription keys # sets of clients mapped from subscription keys
subscribers: defaultdict[ subscribers: defaultdict[
@ -360,9 +359,9 @@ class Router(Struct):
self, self,
brokername: str, brokername: str,
) -> _DarkBook: ) -> DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername)) return self.books.setdefault(brokername, DarkBook(brokername))
def get_subs( def get_subs(
self, self,
@ -378,7 +377,130 @@ class Router(Struct):
if not stream._closed if not stream._closed
) )
async def maybe_open_trade_relays( @acm
async def maybe_open_brokerd_dialog(
self,
feed: Feed,
exec_mode: str,
symbol: str,
loglevel: str,
) -> None:
brokermod = feed.mod
broker = brokermod.name
relay: TradesRelay = self.relays.get(broker)
if (
relay
# We always want to spawn a new relay for the paper
# engine per symbol since we need a new tractor context
# to be opened for every every symbol such that a new
# data feed and ``PaperBoi`` client will be created and
# then used to simulate clearing events.
and exec_mode != 'paper'
):
# deliver already cached instance
yield relay
return
trades_endpoint = getattr(brokermod, 'trades_dialogue', None)
if (
trades_endpoint is None
or exec_mode == 'paper'
):
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running
# a paper-simulator clearing engine.
# load the paper trading engine
exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
fqsn='.'.join([symbol, broker]),
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = feed.portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
# open trades-dialog endpoint with backend broker
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
open_trades_endpoint as (
brokerd_ctx,
(positions, accounts,),
),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd`
# actor to relay global `brokerd` order events
# unless we're going to expect each backend to
# relay only orders affiliated with a particular
# ``trades_dialogue()`` session (seems annoying
# for implementers). So, here we cache the relay
# task and instead of running multiple tasks
# (which will result in multiples of the same
# msg being relayed for each EMS client) we just
# register each client stream to this single
# relay loop in the dialog table.
# begin processing order events from the target
# brokerd backend by receiving order submission
# response messages, normalizing them to EMS
# messages and relaying back to the piker order
# client set.
# locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {}
for msg in positions:
log.info(f'loading pp: {msg}')
account = msg['account']
# TODO: better value error for this which
# dumps the account and message and states the
# mismatch..
assert account in accounts
pps.setdefault(
(broker, account),
[],
).append(msg)
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions=pps,
accounts=accounts,
consumers=1,
)
self.relays[broker] = relay
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
try:
yield relay
finally:
# parent context must have been closed remove from cache so
# next client will respawn if needed
relay = self.relays.pop(broker, None)
if not relay:
log.warning(
f'Relay for {broker} was already removed!?')
async def open_trade_relays(
self, self,
fqsn: str, fqsn: str,
exec_mode: str, exec_mode: str,
@ -408,144 +530,48 @@ class Router(Struct):
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
first_quote: dict = feed.first_quotes[fqsn] first_quote: dict = feed.first_quotes[fqsn]
book: _DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last'] book.lasts[fqsn]: float = first_quote['last']
relay: TradesRelay = self.relays.get(broker) async with self.maybe_open_brokerd_dialog(
if ( feed=feed,
relay exec_mode=exec_mode,
symbol=symbol,
loglevel=loglevel,
) as relay:
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self.nursery.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
quote_stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
)
client_ready = trio.Event()
task_status.started((relay, feed, client_ready))
# sync to the client side by waiting for the stream
# connection setup before relaying any existing live
# orders from the brokerd.
await client_ready.wait()
assert self.subscribers
# spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
self,
)
# We always want to spawn a new relay for the paper engine
# per symbol since we need a new tractor context to be
# opened for every every symbol such that a new data feed
# and ``PaperBoi`` client will be created and then used to
# simulate clearing events.
and exec_mode != 'paper'
):
task_status.started((relay, feed))
await trio.sleep_forever() await trio.sleep_forever()
return
trades_endpoint = getattr(brokermod, 'trades_dialogue', None)
if (
trades_endpoint is None
or exec_mode == 'paper'
):
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running
# a paper-simulator clearing engine.
# load the paper trading engine
exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
fqsn='.'.join([symbol, broker]),
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = feed.portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
# open trades-dialog endpoint with backend broker
try:
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
open_trades_endpoint as (
brokerd_ctx,
(positions, accounts,),
),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd`
# actor to relay global `brokerd` order events
# unless we're going to expect each backend to
# relay only orders affiliated with a particular
# ``trades_dialogue()`` session (seems annoying
# for implementers). So, here we cache the relay
# task and instead of running multiple tasks
# (which will result in multiples of the same
# msg being relayed for each EMS client) we just
# register each client stream to this single
# relay loop in the dialog table.
# begin processing order events from the target
# brokerd backend by receiving order submission
# response messages, normalizing them to EMS
# messages and relaying back to the piker order
# client set.
# locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {}
for msg in positions:
log.info(f'loading pp: {msg}')
account = msg['account']
# TODO: better value error for this which
# dumps the account and message and states the
# mismatch..
assert account in accounts
pps.setdefault(
(broker, account),
[],
).append(msg)
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions=pps,
accounts=accounts,
consumers=1,
)
self.relays[broker] = relay
# spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
self,
)
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self.nursery.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
quote_stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
)
task_status.started((relay, feed))
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally:
# parent context must have been closed remove from cache so
# next client will respawn if needed
relay = self.relays.pop(broker, None)
if not relay:
log.warning(f'Relay for {broker} was already removed!?')
async def client_broadcast( async def client_broadcast(
self, self,
@ -584,7 +610,7 @@ class Router(Struct):
and notify_on_headless and notify_on_headless
): ):
log.info( log.info(
'No clients attached, firing notification for msg:\n' 'No clients attached, firing notification for {sub_key} msg:\n'
f'{msg}' f'{msg}'
) )
await notify_from_ems_status_msg( await notify_from_ems_status_msg(
@ -645,7 +671,7 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
''' '''
book: _DarkBook = router.get_dark_book(broker) book: DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker] relay: TradesRelay = router.relays[broker]
assert relay.brokerd_stream == brokerd_trades_stream assert relay.brokerd_stream == brokerd_trades_stream
@ -885,7 +911,11 @@ async def translate_and_relay_brokerd_events(
# use backend request id as our ems id though this # use backend request id as our ems id though this
# may end up with collisions? # may end up with collisions?
status_msg = Status(**brokerd_msg) status_msg = Status(**brokerd_msg)
# NOTE: be sure to pack an fqsn for the client side!
order = Order(**status_msg.req) order = Order(**status_msg.req)
order.symbol = mk_fqsn(broker, order.symbol)
assert order.price and order.size assert order.price and order.size
status_msg.req = order status_msg.req = order
@ -961,7 +991,7 @@ async def process_client_order_cmds(
fqsn: str, fqsn: str,
feed: Feed, feed: Feed,
dark_book: _DarkBook, dark_book: DarkBook,
router: Router, router: Router,
) -> None: ) -> None:
@ -1042,7 +1072,7 @@ async def process_client_order_cmds(
and status.resp == 'dark_open' and status.resp == 'dark_open'
): ):
# remove from dark book clearing # remove from dark book clearing
entry = dark_book.orders[fqsn].pop(oid, None) entry = dark_book.triggers[fqsn].pop(oid, None)
if entry: if entry:
( (
pred, pred,
@ -1204,7 +1234,7 @@ async def process_client_order_cmds(
# submit execution/order to EMS scan loop # submit execution/order to EMS scan loop
# NOTE: this may result in an override of an existing # NOTE: this may result in an override of an existing
# dark book entry if the order id already exists # dark book entry if the order id already exists
dark_book.orders.setdefault( dark_book.triggers.setdefault(
fqsn, {} fqsn, {}
)[oid] = ( )[oid] = (
pred, pred,
@ -1287,7 +1317,6 @@ async def _emsd_main(
global _router global _router
assert _router assert _router
from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn) broker, symbol, suffix = unpack_fqsn(fqsn)
# TODO: would be nice if in tractor we can require either a ctx arg, # TODO: would be nice if in tractor we can require either a ctx arg,
@ -1298,14 +1327,15 @@ async def _emsd_main(
# spawn one task per broker feed # spawn one task per broker feed
relay: TradesRelay relay: TradesRelay
feed: Feed feed: Feed
client_ready: trio.Event
# open a stream with the brokerd backend for order flow dialogue # open a stream with the brokerd backend for order flow dialogue
# only open if one isn't already up: we try to keep as few duplicate # only open if one isn't already up: we try to keep as few duplicate
# streams as necessary. # streams as necessary.
# TODO: should we try using `tractor.trionics.maybe_open_context()` # TODO: should we try using `tractor.trionics.maybe_open_context()`
# here? # here?
relay, feed = await _router.nursery.start( relay, feed, client_ready = await _router.nursery.start(
_router.maybe_open_trade_relays, _router.open_trade_relays,
fqsn, fqsn,
exec_mode, exec_mode,
loglevel, loglevel,
@ -1333,6 +1363,7 @@ async def _emsd_main(
# allowed to see in terms of broadcasted order flow # allowed to see in terms of broadcasted order flow
# updates per dialog. # updates per dialog.
_router.subscribers[fqsn].add(client_stream) _router.subscribers[fqsn].add(client_stream)
client_ready.set()
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled. # main entrypoint, run here until cancelled.