Support multiple client trade flows over one `brokerd~

There is no reason to have more then `brokerd` trades dialogue stream
open per `emsd`. Here we minimize to managing that lone stream and
multiplexing msgs from each client such that multiple clients can be
connected to the ems, conducting trading without requiring multiple
ems-client connections to the backend broker and without the broker
being aware there are even multiple flows going on.

This patch also sets up for being able to have ems clients which
register to receive and track trade flows from other piker clients thus
enabling so called "multi-player" trading where orders for both paper
and live trades can be shared between multiple participants in the form
of a pre-broker, local clearing service and trade signals dark book.
minimal_brokerd_trade_dialogues
Tyler Goodlet 2021-06-25 00:57:58 -04:00
parent 8b6fb83257
commit 9f897ba75c
1 changed files with 270 additions and 153 deletions

View File

@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any
from bidict import bidict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -82,15 +84,16 @@ def mk_check(
@dataclass
class _DarkBook:
"""Client-side execution book.
'''EMS-trigger execution book.
Contains conditions for executions (aka "orders") which are not
exposed to brokers and thus the market; i.e. these are privacy
focussed "client side" orders.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
privacy focussed "client side" orders which are submitted in real-time
based on specified trigger conditions.
A singleton instance is created per EMS actor (for now).
A an instance per `brokerd` is created per EMS actor (for now).
"""
'''
broker: str
# levels which have an executable action (eg. alert, order, signal)
@ -256,17 +259,34 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
consumers: int = 0
class _Router(BaseModel):
'''Order router which manages per-broker dark books, alerts,
and clearing related data feed management.
'''Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
A singleton per ``emsd`` actor.
'''
# setup at actor spawn time
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
relays: dict[str, tuple[dict, tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
@ -280,10 +300,160 @@ class _Router(BaseModel):
return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay = self.relays.get(feed.mod.name)
if relay is None:
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
_exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: _Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
symbol: str,
_exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug about this!
# portal = feed._brokerd_portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed/fucked
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=positions,
consumers=1
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _setup_persistent_emsd(
@ -298,20 +468,18 @@ async def _setup_persistent_emsd(
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent
# TODO: send back the full set of persistent
# orders/execs?
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
# allow service tasks to run until cancelled
await trio.sleep_forever()
async def translate_and_relay_brokerd_events(
broker: str,
# ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
router: _Router,
) -> AsyncIterator[dict]:
@ -334,6 +502,11 @@ async def translate_and_relay_brokerd_events(
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name']
@ -342,13 +515,15 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by
# broadcasting updates on all client streams
for oid, ems_client_order_stream in router.dialogues.items():
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
for client_stream in router.clients:
await client_stream.send(pos_msg)
continue
@ -425,7 +600,7 @@ async def translate_and_relay_brokerd_events(
resp = None
broker_details = {}
client_flow_complete: bool = False
# client_flow_complete: bool = False
if name in (
'error',
@ -460,7 +635,7 @@ async def translate_and_relay_brokerd_events(
if msg.status == 'cancelled':
client_flow_complete = True
# client_flow_complete = True
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled':
@ -473,7 +648,7 @@ async def translate_and_relay_brokerd_events(
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
client_flow_complete = True
# client_flow_complete = True
log.info(f'Execution for {oid} is complete!')
# just log it
@ -503,22 +678,26 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message
# to requesting EMS client
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=broker_details,
).dict()
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# if client_flow_complete:
# router.dialogues.pop(oid)
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# if client_flow_complete:
# router.dialogues.pop(oid)
async def process_client_order_cmds(
@ -533,6 +712,8 @@ async def process_client_order_cmds(
) -> None:
client_dialogues = router.dialogues
# cmd: dict
async for cmd in client_order_stream:
@ -541,14 +722,16 @@ async def process_client_order_cmds(
action = cmd['action']
oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
router.dialogues[oid] = client_order_stream
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
@ -655,7 +838,7 @@ async def process_client_order_cmds(
# flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that order with them immediately.
# that live order asap.
dark_book._ems_entries[oid] = msg
# "DARK" triggers
@ -725,102 +908,6 @@ async def process_client_order_cmds(
)
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
broker: str,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if broker in _router.relays:
positions, brokerd_trades_stream = _router.relays[broker]
# TODO: get updated positions here?
yield positions, brokerd_trades_stream
return
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
trio.open_nursery() as n,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
# ems_client_order_stream,
brokerd_trades_stream,
dark_book,
_router,
)
_router.relays[broker] = (positions, brokerd_trades_stream)
try:
yield positions, brokerd_trades_stream
finally:
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _emsd_main(
@ -855,7 +942,7 @@ async def _emsd_main(
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
|
- ``translate_and_relay_brokerd_events()``:
- (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker.
@ -905,24 +992,24 @@ async def _emsd_main(
# only open if one isn't already up: we try to keep
# as few duplicate streams as necessary
maybe_open_brokerd_trades_dialogue(
_router,
_router.maybe_open_brokerd_trades_dialogue(
feed,
broker,
symbol,
dark_book,
_exec_mode,
loglevel,
) as (positions, brokerd_trades_stream),
) as relay,
trio.open_nursery() as n,
):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started
# TODO: we could eventually send back **all** brokerd
# positions here?
await ems_ctx.started(positions)
await ems_ctx.started(relay.positions)
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
@ -932,7 +1019,8 @@ async def _emsd_main(
n.start_soon(
clear_dark_triggers,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
ems_client_order_stream,
feed.stream,
@ -942,12 +1030,41 @@ async def _emsd_main(
)
# start inbound (from attached client) order request processing
await process_client_order_cmds(
try:
_router.clients.add(ems_client_order_stream)
ems_client_order_stream,
brokerd_trades_stream,
symbol,
feed,
dark_book,
_router,
)
await process_client_order_cmds(
ems_client_order_stream,
# relay.brokerd_dialogue,
brokerd_stream,
symbol,
feed,
dark_book,
_router,
)
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
dialogues = _router.dialogues
for oid, client_stream in dialogues.items():
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.).