Merge pull request #202 from pikers/minimal_brokerd_trade_dialogues

Minimal `brokerd` trade dialogues
ci_on_forks
goodboy 2021-07-08 10:55:22 -04:00 committed by GitHub
commit e4518c59af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 418 additions and 188 deletions

View File

@ -18,19 +18,23 @@
In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from pprint import pformat
import time
from dataclasses import dataclass, field
from typing import AsyncIterator, Callable, Any
from bidict import bidict
from pydantic import BaseModel
import trio
from trio_typing import TaskStatus
import tractor
from .. import data
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
@ -80,15 +84,16 @@ def mk_check(
@dataclass
class _DarkBook:
"""Client-side execution book.
'''EMS-trigger execution book.
Contains conditions for executions (aka "orders") which are not
exposed to brokers and thus the market; i.e. these are privacy
focussed "client side" orders.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
privacy focussed "client side" orders which are submitted in real-time
based on specified trigger conditions.
A singleton instance is created per EMS actor (for now).
An instance per `brokerd` is created per EMS actor (for now).
"""
'''
broker: str
# levels which have an executable action (eg. alert, order, signal)
@ -254,30 +259,237 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
# TODO: lots of cases still to handle
# XXX: right now this is very very ad-hoc to IB
# - short-sale but securities haven't been located, in this case we
# should probably keep the order in some kind of weird state or cancel
# it outright?
# status='PendingSubmit', message=''),
# status='Cancelled', message='Error 404,
# reqId 1550: Order held while securities are located.'),
# status='PreSubmitted', message='')],
@dataclass
class TradesRelay:
brokerd_dialogue: tractor.MsgStream
positions: dict[str, float]
consumers: int = 0
class _Router(BaseModel):
'''Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
A singleton per ``emsd`` actor.
'''
# setup at actor spawn time
nursery: trio.Nursery
feeds: dict[tuple[str, str], Any] = {}
# broker to book map
books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
def get_dark_book(
self,
brokername: str,
) -> _DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
_exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay = self.relays.get(feed.mod.name)
if relay is None:
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
_exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: _Router = None
async def open_brokerd_trades_dialogue(
router: _Router,
feed: Feed,
symbol: str,
_exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug/test for this!
# portal = feed._brokerd_portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=positions,
consumers=1
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
_router.relays.pop(broker)
@tractor.context
async def _setup_persistent_emsd(
ctx: tractor.Context,
) -> None:
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
await ctx.started()
# allow service tasks to run until cancelled
await trio.sleep_forever()
async def translate_and_relay_brokerd_events(
broker: str,
ems_client_order_stream: tractor.MsgStream,
brokerd_trades_stream: tractor.MsgStream,
book: _DarkBook,
router: _Router,
) -> AsyncIterator[dict]:
"""Trades update loop - receive updates from broker, convert
to EMS responses, transmit to ordering client(s).
'''Trades update loop - receive updates from ``brokerd`` trades
endpoint, convert to EMS response msgs, transmit **only** to
ordering client(s).
This is where trade confirmations from the broker are processed
and appropriate responses relayed back to the original EMS client
actor. There is a messaging translation layer throughout.
This is where trade confirmations from the broker are processed and
appropriate responses relayed **only** back to the original EMS
client actor. There is a messaging translation layer throughout.
Expected message translation(s):
@ -286,10 +498,15 @@ async def translate_and_relay_brokerd_events(
'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled'
Currently accepted status values from IB:
Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
"""
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
async for brokerd_msg in brokerd_trades_stream:
name = brokerd_msg['name']
@ -298,10 +515,16 @@ async def translate_and_relay_brokerd_events(
if name == 'position':
# relay through position msgs immediately
await ems_client_order_stream.send(
BrokerdPosition(**brokerd_msg).dict()
)
pos_msg = BrokerdPosition(**brokerd_msg).dict()
# keep up to date locally in ``emsd``
relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg)
# relay through position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients:
await client_stream.send(pos_msg)
continue
# Get the broker (order) request id, this **must** be normalized
@ -331,7 +554,7 @@ async def translate_and_relay_brokerd_events(
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
ext = brokerd_msg.get('external')
ext = brokerd_msg['broker_details'].get('external')
if ext:
log.error(f"External trade event {ext}")
@ -407,22 +630,12 @@ async def translate_and_relay_brokerd_events(
elif name in (
'status',
):
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
msg = BrokerdStatus(**brokerd_msg)
if msg.status == 'cancelled':
log.info(f'Cancellation for {oid} is complete!')
if msg.status == 'filled':
# conditional execution is fully complete, no more
@ -431,6 +644,8 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# just log it
@ -460,6 +675,8 @@ async def translate_and_relay_brokerd_events(
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
@ -469,6 +686,14 @@ async def translate_and_relay_brokerd_events(
brokerd_msg=broker_details,
).dict()
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# router.dialogues.pop(oid)
async def process_client_order_cmds(
@ -477,11 +702,14 @@ async def process_client_order_cmds(
brokerd_order_stream: tractor.MsgStream,
symbol: str,
feed: 'Feed', # noqa
feed: Feed, # noqa
dark_book: _DarkBook,
router: _Router,
) -> None:
client_dialogues = router.dialogues
# cmd: dict
async for cmd in client_order_stream:
@ -489,6 +717,18 @@ async def process_client_order_cmds(
action = cmd['action']
oid = cmd['oid']
# TODO: make ``tractor.MsgStream`` a frozen type again such that it
# can be stored in sets like the old context was.
# wait, maybe this **is** already working thanks to our parent
# `trio` type?
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
@ -498,27 +738,32 @@ async def process_client_order_cmds(
# check for live-broker order
if live_entry:
reqid = live_entry.reqid
msg = BrokerdCancel(
oid=oid,
reqid=reqid or live_entry.reqid,
reqid=reqid,
time_ns=time.time_ns(),
)
# send cancel to brokerd immediately!
log.info("Submitting cancel for live order")
# NOTE: cancel response will be relayed back in messages
# from corresponding broker
if reqid:
# send cancel to brokerd immediately!
log.info("Submitting cancel for live order {reqid}")
await brokerd_order_stream.send(msg.dict())
else:
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg
# check for EMS active exec
# dark trigger cancel
else:
try:
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
@ -532,6 +777,8 @@ async def process_client_order_cmds(
time_ns=time.time_ns(),
).dict()
)
# de-register this client dialogue
router.dialogues.pop(oid)
except KeyError:
log.exception(f'No dark order for {symbol}?')
@ -581,17 +828,22 @@ async def process_client_order_cmds(
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict())
# an immediate response should be brokerd ack with order
# id but we register our request as part of the flow
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
# endpoint, but we register our request as part of the
# flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
dark_book._ems_entries[oid] = msg
elif exec_mode in ('dark', 'paper') or (
action in ('alert')
):
# "DARK" triggers
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
elif exec_mode in ('dark', 'paper') or (
action in ('alert')
):
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
# condition should be based on the current first
@ -637,11 +889,11 @@ async def process_client_order_cmds(
percent_away,
abs_diff_away
)
resp = 'dark_submitted'
# alerts have special msgs to distinguish
if action == 'alert':
resp = 'alert_submitted'
else:
resp = 'dark_submitted'
await client_order_stream.send(
Status(
@ -686,7 +938,7 @@ async def _emsd_main(
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
|
- ``translate_and_relay_brokerd_events()``:
- (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker.
@ -697,6 +949,8 @@ async def _emsd_main(
'''
global _router
assert _router
dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg,
@ -711,8 +965,6 @@ async def _emsd_main(
# spawn one task per broker feed
async with (
trio.open_nursery() as n,
# TODO: eventually support N-brokers
data.open_feed(
broker,
@ -732,43 +984,28 @@ async def _emsd_main(
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote[symbol]['last']
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
portal = feed._brokerd_portal
if trades_endpoint is None or _exec_mode == 'paper':
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running a
# paper-simulator clearing engine.
# load the paper trading engine
_exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
broker=broker,
symbol=symbol,
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
async with (
open_trades_endpoint as (brokerd_ctx, positions),
brokerd_ctx.open_stream() as brokerd_trades_stream,
# only open if one isn't already up: we try to keep
# as few duplicate streams as necessary
_router.maybe_open_brokerd_trades_dialogue(
feed,
symbol,
dark_book,
_exec_mode,
loglevel,
) as relay,
trio.open_nursery() as n,
):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started
# TODO: we could eventually send back **all** brokerd
# positions here?
await ems_ctx.started(positions)
await ems_ctx.started(relay.positions)
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
@ -778,7 +1015,8 @@ async def _emsd_main(
n.start_soon(
clear_dark_triggers,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
ems_client_order_stream,
feed.stream,
@ -787,72 +1025,42 @@ async def _emsd_main(
book
)
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client.
n.start_soon(
translate_and_relay_brokerd_events,
broker,
ems_client_order_stream,
brokerd_trades_stream,
dark_book,
)
# start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
await process_client_order_cmds(
ems_client_order_stream,
brokerd_trades_stream,
# relay.brokerd_dialogue,
brokerd_stream,
symbol,
feed,
dark_book,
_router,
)
finally:
# remove client from "registry"
_router.clients.remove(ems_client_order_stream)
class _Router(BaseModel):
'''Order router which manages per-broker dark books, alerts,
and clearing related data feed management.
dialogues = _router.dialogues
'''
nursery: trio.Nursery
for oid, client_stream in dialogues.items():
feeds: dict[tuple[str, str], Any] = {}
books: dict[str, _DarkBook] = {}
if client_stream == ems_client_order_stream:
class Config:
arbitrary_types_allowed = True
underscore_attrs_are_private = False
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
def get_dark_book(
self,
brokername: str,
) -> _DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername))
_router: _Router = None
@tractor.context
async def _setup_persistent_emsd(
ctx: tractor.Context,
) -> None:
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = _Router(nursery=service_nursery)
# TODO: send back the full set of persistent orders/execs persistent
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.)

View File

@ -233,12 +233,13 @@ async def sample_and_broadcast(
for (stream, tick_throttle) in subs:
try:
if tick_throttle:
await stream.send(quote)
else:
try:
await stream.send({sym: quote})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -247,13 +248,19 @@ async def sample_and_broadcast(
# if it's done in the fee bus code?
# so far seems like no since this should all
# be single-threaded.
log.error(f'{stream._ctx.chan.uid} dropped connection')
log.warning(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
subs.remove((stream, tick_throttle))
async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
) -> None:
sleep_period = 1/rate - 0.000616
@ -289,8 +296,14 @@ async def uniform_rate_send(
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({first_quote['symbol']: first_quote})
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop
# out of this rate limiter
log.warning(f'{stream} closed')
return
end = time.time()
diff = end - start

View File

@ -305,6 +305,11 @@ async def attach_feed_bus(
):
if tick_throttle:
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
send, recv = trio.open_memory_channel(2**10)
n.start_soon(
uniform_rate_send,
@ -321,7 +326,12 @@ async def attach_feed_bus(
try:
await trio.sleep_forever()
finally:
log.info(
f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}')
if tick_throttle:
n.cancel_scope.cancel()
bus._subscribers[symbol].remove(sub)
@ -473,11 +483,6 @@ async def open_feed(
ctx.open_stream() as stream,
):
# TODO: can we make this work better with the proposed
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
# init_msg = await stream.receive()
# we can only read from shm
shm = attach_shm_array(
token=init_msg[sym]['shm_token'],
@ -520,4 +525,8 @@ async def open_feed(
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# drop the infinite stream connection
await ctx.cancel()