Run dark-clear-loop in daemon task

This enables "headless" dark order matching and clearing where an `emsd`
daemon subactor can be left running with active dark (or other
algorithmic) orders which will still trigger despite to attached-controlling
ems-client.

Impl details:
- rename/add `Router.maybe_open_trade_relays()` which now does all work
  of starting up ems-side long living clearing and relay tasks and the
  associated data feed; make is a `Nursery.start()`-able task instead of
  an `@acm`.
- drop `open_brokerd_trades_dialog()` and move/factor contents into the
  above method.
- add support for a `router.client_broadcast('all', msg)` to wholesale
  fan out a msg to all clients.
offline_dark_clearing
Tyler Goodlet 2022-10-04 21:52:24 -04:00
parent 31b0d8cee8
commit 49433ea87d
1 changed files with 234 additions and 271 deletions

View File

@ -23,7 +23,6 @@ from collections import (
defaultdict, defaultdict,
# ChainMap, # ChainMap,
) )
from contextlib import asynccontextmanager
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
import time import time
@ -41,9 +40,12 @@ import tractor
from ..log import get_logger from ..log import get_logger
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed from ..data.feed import (
Feed,
maybe_open_feed,
)
from ..data.types import Struct from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd # from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Order, Order,
@ -135,7 +137,6 @@ class _DarkBook(Struct):
float, float,
] = {} ] = {}
# _ems_entries: dict[str, str] = {}
_active: dict = {} _active: dict = {}
_ems2brokerd_ids: dict[str, str] = bidict() _ems2brokerd_ids: dict[str, str] = bidict()
@ -247,7 +248,6 @@ async def clear_dark_triggers(
await brokerd_orders_stream.send(brokerd_msg) await brokerd_orders_stream.send(brokerd_msg)
# book._ems_entries[oid] = live_req
# book._msgflows[oid].maps.insert(0, live_req) # book._msgflows[oid].maps.insert(0, live_req)
case _: case _:
@ -383,62 +383,174 @@ class Router(Struct):
if not stream._closed if not stream._closed
) )
@asynccontextmanager async def maybe_open_trade_relays(
async def maybe_open_brokerd_trades_dialogue(
self, self,
feed: Feed, fqsn: str,
symbol: str,
dark_book: _DarkBook,
exec_mode: str, exec_mode: str,
loglevel: str, loglevel: str,
) -> tuple[dict, tractor.MsgStream]: task_status: TaskStatus[
tuple[TradesRelay, Feed]
] = trio.TASK_STATUS_IGNORED,
) -> tuple[TradesRelay, Feed]:
''' '''
Open and yield ``brokerd`` trades dialogue context-stream if Open and yield ``brokerd`` trades dialogue context-stream if
none already exists. none already exists.
''' '''
broker = feed.mod.name from ..data._source import unpack_fqsn
relay: TradesRelay = self.relays.get(broker) broker, symbol, suffix = unpack_fqsn(fqsn)
if ( async with (
relay is None maybe_open_feed(
[fqsn],
# We always want to spawn a new relay for the paper engine loglevel=loglevel,
# per symbol since we need a new tractor context to be ) as (feed, quote_stream),
# opened for every every symbol such that a new data feed
# and ``PaperBoi`` client will be created and then used to
# simulate clearing events.
or exec_mode == 'paper'
): ):
brokermod = feed.mod
broker = brokermod.name
relay = await self.nursery.start( # XXX: this should be initial price quote from target provider
open_brokerd_trades_dialog, first_quote: dict = feed.first_quotes[fqsn]
self, book: _DarkBook = self.get_dark_book(broker)
feed, book.lasts[fqsn]: float = first_quote['last']
symbol,
exec_mode,
loglevel,
)
self.nursery.start_soon( relay: TradesRelay = self.relays.get(broker)
translate_and_relay_brokerd_events, if (
broker, relay
relay.brokerd_stream,
self,
)
relay.consumers += 1 # We always want to spawn a new relay for the paper engine
# per symbol since we need a new tractor context to be
# opened for every every symbol such that a new data feed
# and ``PaperBoi`` client will be created and then used to
# simulate clearing events.
and exec_mode != 'paper'
):
task_status.started((relay, feed))
await trio.sleep_forever()
return
# TODO: get updated positions here? trades_endpoint = getattr(brokermod, 'trades_dialogue', None)
assert relay.brokerd_stream if (
try: trades_endpoint is None
yield relay or exec_mode == 'paper'
finally: ):
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running
# a paper-simulator clearing engine.
# TODO: what exactly needs to be torn down here or # load the paper trading engine
# are we just consumer tracking? exec_mode = 'paper'
relay.consumers -= 1 log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
fqsn='.'.join([symbol, broker]),
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = feed.portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
# open trades-dialog endpoint with backend broker
try:
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
open_trades_endpoint as (
brokerd_ctx,
(positions, accounts,),
),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd`
# actor to relay global `brokerd` order events
# unless we're going to expect each backend to
# relay only orders affiliated with a particular
# ``trades_dialogue()`` session (seems annoying
# for implementers). So, here we cache the relay
# task and instead of running multiple tasks
# (which will result in multiples of the same
# msg being relayed for each EMS client) we just
# register each client stream to this single
# relay loop in the dialog table.
# begin processing order events from the target
# brokerd backend by receiving order submission
# response messages, normalizing them to EMS
# messages and relaying back to the piker order
# client set.
# locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {}
for msg in positions:
log.info(f'loading pp: {msg}')
account = msg['account']
# TODO: better value error for this which
# dumps the account and message and states the
# mismatch..
assert account in accounts
pps.setdefault(
(broker, account),
[],
).append(msg)
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions=pps,
accounts=accounts,
consumers=1,
)
self.relays[broker] = relay
# spawn a ``brokerd`` order control dialog stream
# that syncs lifetime with the parent `emsd` daemon.
self.nursery.start_soon(
translate_and_relay_brokerd_events,
broker,
relay.brokerd_stream,
self,
)
# dark book clearing loop, also lives with parent
# daemon to allow dark order clearing while no
# client is connected.
self.nursery.start_soon(
clear_dark_triggers,
self,
relay.brokerd_stream,
quote_stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
)
task_status.started((relay, feed))
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally:
# parent context must have been closed remove from cache so
# next client will respawn if needed
relay = self.relays.pop(broker, None)
if not relay:
log.warning(f'Relay for {broker} was already removed!?')
async def client_broadcast( async def client_broadcast(
self, self,
@ -447,7 +559,14 @@ class Router(Struct):
) -> None: ) -> None:
to_remove: set[tractor.MsgStream] = set() to_remove: set[tractor.MsgStream] = set()
subs = self.subscribers[sub_key]
if sub_key == 'all':
subs = set()
for s in self.subscribers.values():
subs |= s
else:
subs = self.subscribers[sub_key]
for client_stream in subs: for client_stream in subs:
try: try:
await client_stream.send(msg) await client_stream.send(msg)
@ -467,134 +586,6 @@ class Router(Struct):
_router: Router = None _router: Router = None
async def open_brokerd_trades_dialog(
router: Router,
feed: Feed,
symbol: str,
exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''
Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug/test for this!
# if only i could member what the problem was..
# probably some GC of the portal thing?
# portal = feed.portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if (
trades_endpoint is None
or exec_mode == 'paper'
):
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running
# a paper-simulator clearing engine.
# load the paper trading engine
exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
fqsn='.'.join([symbol, broker]),
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# going to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop in the dialog table.
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
# locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {}
for msg in positions:
log.info(f'loading pp: {msg}')
account = msg['account']
# TODO: better value error for this which
# dumps the account and message and states the
# mismatch..
assert account in accounts
pps.setdefault(
(broker, account),
[],
).append(msg)
relay = TradesRelay(
brokerd_stream=brokerd_trades_stream,
positions=pps,
accounts=accounts,
consumers=1,
)
router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally:
# parent context must have been closed remove from cache so
# next client will respawn if needed
relay = router.relays.pop(broker, None)
if not relay:
log.warning(f'Relay for {broker} was already removed!?')
@tractor.context @tractor.context
async def _setup_persistent_emsd( async def _setup_persistent_emsd(
@ -677,7 +668,7 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
await router.client_broadcast(sym, pos_msg) await router.client_broadcast('all', pos_msg)
continue continue
# BrokerdOrderAck # BrokerdOrderAck
@ -827,7 +818,7 @@ async def translate_and_relay_brokerd_events(
# ``.oid`` in the msg since we're planning to # ``.oid`` in the msg since we're planning to
# maybe-kinda offer that via using ``Status`` # maybe-kinda offer that via using ``Status``
# in the longer run anyway? # in the longer run anyway?
log.warning(f'Unkown fill for {fmsg}') log.warning(f'Unknown fill for {fmsg}')
continue continue
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
@ -1026,7 +1017,6 @@ async def process_client_order_cmds(
# acked yet by a brokerd, so register a cancel for when # acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd # the order ack does show up later such that the brokerd
# order request can be cancelled at that time. # order request can be cancelled at that time.
# dark_book._ems_entries[oid] = msg
# special case for now.. # special case for now..
status.req = to_brokerd_msg status.req = to_brokerd_msg
@ -1286,7 +1276,6 @@ async def _emsd_main(
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn) broker, symbol, suffix = unpack_fqsn(fqsn)
dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg, # TODO: would be nice if in tractor we can require either a ctx arg,
# or a named arg with ctx in it and a type annotation of # or a named arg with ctx in it and a type annotation of
@ -1294,108 +1283,82 @@ async def _emsd_main(
ems_ctx = ctx ems_ctx = ctx
# spawn one task per broker feed # spawn one task per broker feed
relay: TradesRelay
feed: Feed feed: Feed
async with (
maybe_open_feed(
[fqsn],
loglevel=loglevel,
) as (feed, quote_stream),
):
# XXX: this should be initial price quote from target provider # open a stream with the brokerd backend for order flow dialogue
first_quote: dict = feed.first_quotes[fqsn] # only open if one isn't already up: we try to keep as few duplicate
book: _DarkBook = _router.get_dark_book(broker) # streams as necessary.
book.lasts[fqsn]: float = first_quote['last'] # TODO: should we try using `tractor.trionics.maybe_open_context()`
# here?
relay, feed = await _router.nursery.start(
_router.maybe_open_trade_relays,
fqsn,
exec_mode,
loglevel,
)
brokerd_stream = relay.brokerd_stream
dark_book = _router.get_dark_book(broker)
# open a stream with the brokerd backend for order # signal to client that we're started and deliver
# flow dialogue # all known pps and accounts for this ``brokerd``.
async with ( await ems_ctx.started((
relay.positions,
list(relay.accounts),
dark_book._active,
))
# only open if one isn't already up: we try to keep # establish 2-way stream with requesting order-client and
# as few duplicate streams as necessary # begin handling inbound order requests and updates
_router.maybe_open_brokerd_trades_dialogue( async with ems_ctx.open_stream() as client_stream:
# register the client side before starting the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(client_stream)
# TODO: instead of by fqsn we need a subscription
# system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow
# updates per dialog.
_router.subscribers[fqsn].add(client_stream)
# start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled.
try:
await process_client_order_cmds(
client_stream,
brokerd_stream,
fqsn,
feed, feed,
symbol,
dark_book, dark_book,
exec_mode, _router,
loglevel, )
) as relay, finally:
trio.open_nursery() as n, # try to remove client from "registry"
): try:
_router.clients.remove(client_stream)
brokerd_stream = relay.brokerd_stream except KeyError:
log.warning(
# signal to client that we're started and deliver f'Stream {client_stream._ctx.chan.uid}'
# all known pps and accounts for this ``brokerd``. ' was already dropped?'
await ems_ctx.started((
relay.positions,
list(relay.accounts),
book._active,
))
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
async with ems_ctx.open_stream() as client_stream:
# register the client side before starting the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(client_stream)
# TODO: instead of by fqsn we need a subscription
# system/schema here to limit what each new client is
# allowed to see in terms of broadcasted order flow
# updates per dialog.
_router.subscribers[fqsn].add(client_stream)
# trigger scan and exec loop
n.start_soon(
clear_dark_triggers,
_router,
brokerd_stream,
quote_stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
) )
# start inbound (from attached client) order request processing _router.subscribers[fqsn].remove(client_stream)
# main entrypoint, run here until cancelled. dialogs = _router.dialogs
try: for oid, client_streams in dialogs.items():
await process_client_order_cmds( if client_stream in client_streams:
client_stream, client_streams.remove(client_stream)
brokerd_stream,
fqsn,
feed,
dark_book,
_router,
)
finally: # TODO: for order dialogs left "alive" in
# try to remove client from "registry" # the ems this is where we should allow some
try: # system to take over management. Likely we
_router.clients.remove(client_stream) # want to allow the user to choose what kind
except KeyError: # of policy to use (eg. cancel all orders
# from client, run some algo, etc.)
if not client_streams:
log.warning( log.warning(
f'Stream {client_stream._ctx.chan.uid}' f'Order dialog is being unmonitored:\n'
' was already dropped?' f'{oid} ->\n{client_stream._ctx.chan.uid}'
) )
_router.subscribers[fqsn].remove(client_stream)
dialogs = _router.dialogs
for oid, client_streams in dialogs.items():
if client_stream in client_streams:
client_streams.remove(client_stream)
# TODO: for order dialogs left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.)
if not client_streams:
log.warning(
f'Order dialog is being unmonitored:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)