piker/piker/clearing/_ems.py

1228 lines
42 KiB
Python
Raw Normal View History

# piker: trading gear for hackers
2021-01-05 18:37:03 +00:00
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
2021-01-14 17:59:00 +00:00
from pprint import pformat
import time
from typing import (
AsyncIterator,
Any,
Callable,
)
2021-01-14 17:59:00 +00:00
from bidict import bidict
import trio
from trio_typing import TaskStatus
import tractor
2021-02-22 22:28:34 +00:00
from ..log import get_logger
from ..data._normalize import iterticks
from ..data.feed import Feed, maybe_open_feed
from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdError, BrokerdPosition,
)
log = get_logger(__name__)
# TODO: numba all of this
2021-03-29 12:35:58 +00:00
def mk_check(
2021-03-29 12:35:58 +00:00
trigger_price: float,
known_last: float,
action: str,
2021-03-29 12:35:58 +00:00
) -> Callable[[float, float], bool]:
'''
Create a predicate for given ``exec_price`` based on last known
price, ``known_last``.
This is an automatic alert level thunk generator based on where the
current last known value is and where the specified value of
interest is; pick an appropriate comparison operator based on
avoiding the case where the a predicate returns true immediately.
'''
# str compares:
# https://stackoverflow.com/questions/46708708/compare-strings-in-numba-compiled-function
if trigger_price >= known_last:
def check_gt(price: float) -> bool:
return price >= trigger_price
return check_gt
elif trigger_price <= known_last:
def check_lt(price: float) -> bool:
return price <= trigger_price
return check_lt
2022-06-01 18:42:39 +00:00
raise ValueError(
f'trigger: {trigger_price}, last: {known_last}'
)
2021-01-09 15:55:36 +00:00
@dataclass
class _DarkBook:
'''
EMS-trigger execution book.
Contains conditions for executions (aka "orders" or "triggers")
which are not exposed to brokers and thus the market; i.e. these are
privacy focussed "client side" orders which are submitted in real-time
based on specified trigger conditions.
An instance per `brokerd` is created per EMS actor (for now).
'''
2021-01-14 17:59:00 +00:00
broker: str
# levels which have an executable action (eg. alert, order, signal)
orders: dict[
2021-01-14 17:59:00 +00:00
str, # symbol
dict[
2021-01-08 03:08:25 +00:00
str, # uuid
tuple[
2021-01-08 03:08:25 +00:00
Callable[[float], bool], # predicate
str, # name
dict, # cmd / msg type
]
]
] = field(default_factory=dict)
# tracks most recent values per symbol each from data feed
lasts: dict[
str,
float,
] = field(default_factory=dict)
# mapping of piker ems order ids to current brokerd order flow message
_ems_entries: dict[str, str] = field(default_factory=dict)
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict)
2021-01-14 17:59:00 +00:00
# XXX: this is in place to prevent accidental positions that are too
# big. Now obviously this won't make sense for crypto like BTC, but
# for most traditional brokers it should be fine unless you start
# slinging NQ futes or something; check ur margin.
_DEFAULT_SIZE: float = 1.0
async def clear_dark_triggers(
brokerd_orders_stream: tractor.MsgStream,
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
fqsn: str,
book: _DarkBook,
) -> None:
'''
Core dark order trigger loop.
Scan the (price) data feed and submit triggered orders
to broker.
'''
# XXX: optimize this for speed!
# TODO:
# - numba all this!
# - this stream may eventually contain multiple symbols
async for quotes in quote_stream:
# start = time.time()
for sym, quote in quotes.items():
execs = book.orders.get(sym, {})
for tick in iterticks(
quote,
# dark order price filter(s)
types=(
'ask',
'bid',
'trade',
'last',
# 'dark_trade', # TODO: should allow via config?
)
):
price = tick.get('price')
# update to keep new cmds informed
book.lasts[sym] = price
2022-08-04 19:42:31 +00:00
ttype = tick['type']
for oid, (
pred,
tf,
cmd,
percent_away,
abs_diff_away
) in (
tuple(execs.items())
):
if (
not pred
or ttype not in tf
or not pred(price)
):
2022-08-04 20:25:46 +00:00
# log.runtime(
# f'skipping quote for {sym} '
# f'{pred} -> {pred(price)}\n'
# f'{ttype} not in {tf}?'
# )
# majority of iterations will be non-matches
continue
2022-08-04 19:42:31 +00:00
match cmd:
# alert: nothing to do but relay a status
# back to the requesting ems client
case {
'action': 'alert',
}:
resp = 'alert_triggered'
# executable order submission
case {
'action': action,
'symbol': symbol,
'account': account,
'size': size,
}:
bfqsn: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away
resp = 'dark_triggered' # hidden on client-side
log.info(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
live_req = BrokerdOrder(
action=action,
oid=oid,
account=account,
time_ns=time.time_ns(),
symbol=bfqsn,
price=submit_price,
size=size,
)
await brokerd_orders_stream.send(live_req)
# mark this entry as having sent an order
# request. the entry will be replaced once the
# target broker replies back with
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
book._ems_entries[oid] = live_req
case _:
raise ValueError(f'Invalid dark book entry: {cmd}')
# fallthrough logic
resp = Status(
oid=oid, # ems dialog id
time_ns=time.time_ns(),
resp=resp,
trigger_price=price,
brokerd_msg=cmd,
)
# remove exec-condition from set
log.info(f'removing pred for {oid}')
pred = execs.pop(oid, None)
if not pred:
log.warning(
f'pred for {oid} was already removed!?'
)
2022-08-04 19:42:31 +00:00
# send response to client-side
try:
2022-08-04 19:42:31 +00:00
await ems_client_order_stream.send(resp)
except (
trio.ClosedResourceError,
):
log.warning(
2022-08-04 19:42:31 +00:00
f'{ems_client_order_stream} stream broke?'
)
break
else: # condition scan loop complete
log.debug(f'execs are {execs}')
if execs:
book.orders[fqsn] = execs
# print(f'execs scan took: {time.time() - start}')
2021-01-19 00:55:50 +00:00
@dataclass
class TradesRelay:
2021-09-10 15:33:08 +00:00
# for now we keep only a single connection open with
# each ``brokerd`` for simplicity.
brokerd_dialogue: tractor.MsgStream
2021-09-10 15:33:08 +00:00
# map of symbols to dicts of accounts to pp msgs
positions: dict[
# brokername, acctid
tuple[str, str],
list[BrokerdPosition],
]
2021-09-10 15:33:08 +00:00
# allowed account names
accounts: tuple[str]
2021-09-10 15:33:08 +00:00
# count of connected ems clients for this ``brokerd``
consumers: int = 0
class Router(Struct):
'''
Order router which manages and tracks per-broker dark book,
alerts, clearing and related data feed management.
A singleton per ``emsd`` actor.
'''
# setup at actor spawn time
nursery: trio.Nursery
# broker to book map
books: dict[str, _DarkBook] = {}
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
def get_dark_book(
self,
brokername: str,
) -> _DarkBook:
return self.books.setdefault(brokername, _DarkBook(brokername))
@asynccontextmanager
async def maybe_open_brokerd_trades_dialogue(
self,
feed: Feed,
symbol: str,
dark_book: _DarkBook,
exec_mode: str,
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
relay: TradesRelay = self.relays.get(feed.mod.name)
if (
relay is None
# We always want to spawn a new relay for the paper engine
# per symbol since we need a new tractor context to be
# opened for every every symbol such that a new data feed
# and ``PaperBoi`` client will be created and then used to
# simulate clearing events.
or exec_mode == 'paper'
):
relay = await self.nursery.start(
open_brokerd_trades_dialogue,
self,
feed,
symbol,
exec_mode,
loglevel,
)
relay.consumers += 1
# TODO: get updated positions here?
assert relay.brokerd_dialogue
try:
yield relay
finally:
# TODO: what exactly needs to be torn down here or
# are we just consumer tracking?
relay.consumers -= 1
_router: Router = None
async def open_brokerd_trades_dialogue(
router: Router,
feed: Feed,
symbol: str,
exec_mode: str,
loglevel: str,
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''
Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
trades_endpoint = getattr(feed.mod, 'trades_dialogue', None)
broker = feed.mod.name
# TODO: make a `tractor` bug/test for this!
2021-09-16 13:17:14 +00:00
# if only i could member what the problem was..
# probably some GC of the portal thing?
# portal = feed.portal
# XXX: we must have our own portal + channel otherwise
# when the data feed closes it may result in a half-closed
# channel that the brokerd side thinks is still open somehow!?
async with maybe_spawn_brokerd(
broker,
loglevel=loglevel,
) as portal:
if (
trades_endpoint is None
or exec_mode == 'paper'
):
# for paper mode we need to mock this trades response feed
# so we load bidir stream to a new sub-actor running
# a paper-simulator clearing engine.
# load the paper trading engine
exec_mode = 'paper'
log.warning(f'Entering paper trading mode for {broker}')
# load the paper trading engine as a subactor of this emsd
# actor to simulate the real IPC load it'll have when also
# pulling data from feeds
open_trades_endpoint = paper.open_paperboi(
fqsn='.'.join([symbol, broker]),
loglevel=loglevel,
)
else:
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
try:
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
# doing to expect each backend to relay only orders
# affiliated with a particular ``trades_dialogue()``
# session (seems annoying for implementers). So, here
# we cache the relay task and instead of running multiple
# tasks (which will result in multiples of the same msg being
# relayed for each EMS client) we just register each client
# stream to this single relay loop using _router.dialogues
# begin processing order events from the target brokerd backend
# by receiving order submission response messages,
# normalizing them to EMS messages and relaying back to
# the piker order client set.
2021-09-13 12:21:42 +00:00
# locally cache and track positions per account with
# a table of (brokername, acctid) -> `BrokerdPosition`
# msgs.
pps = {}
for msg in positions:
log.info(f'loading pp: {msg}')
account = msg['account']
# TODO: better value error for this which
# dumps the account and message and states the
# mismatch..
assert account in accounts
2021-09-13 12:21:42 +00:00
pps.setdefault(
(broker, account),
[],
).append(msg)
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=pps,
accounts=accounts,
consumers=1,
)
_router.relays[broker] = relay
# the ems scan loop may be cancelled by the client but we
# want to keep the ``brokerd`` dialogue up regardless
task_status.started(relay)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally:
# parent context must have been closed
# remove from cache so next client will respawn if needed
relay = _router.relays.pop(broker, None)
if not relay:
log.warning(f'Relay for {broker} was already removed!?')
@tractor.context
async def _setup_persistent_emsd(
ctx: tractor.Context,
) -> None:
global _router
# open a root "service nursery" for the ``emsd`` actor
async with trio.open_nursery() as service_nursery:
_router = Router(nursery=service_nursery)
# TODO: send back the full set of persistent
# orders/execs?
await ctx.started()
# allow service tasks to run until cancelled
await trio.sleep_forever()
2021-01-14 17:59:00 +00:00
async def translate_and_relay_brokerd_events(
broker: str,
brokerd_trades_stream: tractor.MsgStream,
router: Router,
2021-01-09 15:55:36 +00:00
) -> AsyncIterator[dict]:
2022-02-10 16:58:45 +00:00
'''
Trades update loop - receive updates from ``brokerd`` trades
endpoint, convert to EMS response msgs, transmit **only** to
ordering client(s).
2021-01-09 15:55:36 +00:00
This is where trade confirmations from the broker are processed and
appropriate responses relayed **only** back to the original EMS
client actor. There is a messaging translation layer throughout.
2021-01-09 15:55:36 +00:00
2021-01-19 00:55:50 +00:00
Expected message translation(s):
broker ems
'error' -> log it locally (for now)
'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled'
Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
2021-01-19 00:55:50 +00:00
'''
book: _DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
brokerd_msg: dict[str, Any]
async for brokerd_msg in brokerd_trades_stream:
log.info(
f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}'
)
match brokerd_msg:
# BrokerdPosition
case {
'name': 'position',
'symbol': sym,
'broker': broker,
}:
pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
# sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault(
# NOTE: translate to a FQSN!
(broker, sym),
[]
).append(pos_msg)
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
2021-01-23 04:00:01 +00:00
continue
# BrokerdOrderAck
case {
'name': 'ack',
'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id
} if (
entry := book._ems_entries.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
2021-06-10 12:24:10 +00:00
# register the brokerd request id (that was generated
# / created internally by the broker backend) with our
# local ems order id for reverse lookup later.
# a ``BrokerdOrderAck`` **must** be sent after an order
# request in order to establish this id mapping.
book._ems2brokerd_ids[oid] = reqid
log.info(
'Rx ACK for order\n'
f'oid: {oid} -> reqid: {reqid}'
)
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
# - the order has previously been requested to be
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
2021-08-29 16:23:01 +00:00
action = getattr(entry, 'action', None)
if action and action == 'cancel':
# assign newly providerd broker backend request id
entry.reqid = reqid
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
2021-01-08 03:08:25 +00:00
# no msg to client necessary
continue
# BrokerdOrderError
case {
'name': 'error',
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
'broker_details': details,
# 'reason': reason,
}:
msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank?
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# This looks like a supervision policy for pending orders on
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
# BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should
# they maybe even eventually be separate messages?
if status == 'cancelled':
log.info(f'Cancellation for {oid} is complete!')
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not remaining:
resp = 'broker_executed'
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!')
# just log it
else:
log.info(f'{broker} filled {msg}')
else:
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
# unknown valid BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
'broker_details': details,
}:
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
if status == 'submitted':
msg = BrokerdStatus(**brokerd_msg)
log.info(
f'Relaying existing open order:\n {brokerd_msg}'
)
# use backend request id as our ems id though this
# may end up with collisions?
broker = details['name']
oid = str(reqid)
book._ems_entries[oid] = msg
# attempt to avoid collisions
msg.reqid = oid
resp = 'broker_submitted'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
else:
log.error(
f'Unknown status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
continue
# BrokerdFill
case {
'name': 'fill',
'reqid': reqid, # brokerd generated order-request id
# 'symbol': sym, # paper engine doesn't have this, nbd?
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
2021-01-19 00:55:50 +00:00
# retrieve existing live flow
entry = book._ems_entries[oid]
if getattr(entry, 'oid', None):
assert entry.oid == oid
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client oid: {oid}')
else:
# existing open order relay
assert oid == entry.reqid
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
# flow is complete?
# router.dialogues.pop(oid)
2021-01-09 15:55:36 +00:00
async def process_client_order_cmds(
2022-08-04 20:25:46 +00:00
client_order_stream: tractor.MsgStream,
brokerd_order_stream: tractor.MsgStream,
symbol: str,
2022-08-04 20:25:46 +00:00
feed: Feed,
dark_book: _DarkBook,
router: Router,
) -> None:
client_dialogues = router.dialogues
# cmd: dict
async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}')
# CAWT DAMN we need struct support!
oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
match cmd:
# existing live-broker order cancel
case {
'action': 'cancel',
'oid': oid,
} if live_entry:
2021-06-22 14:57:08 +00:00
reqid = live_entry.reqid
msg = BrokerdCancel(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
account=live_entry.account,
)
# NOTE: cancel response will be relayed back in messages
# from corresponding broker
2022-07-09 03:16:29 +00:00
if reqid is not None:
2021-06-22 14:57:08 +00:00
# send cancel to brokerd immediately!
log.info(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg)
2021-06-22 14:57:08 +00:00
else:
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg
# dark trigger cancel
case {
'action': 'cancel',
'oid': oid,
} if not live_entry:
# try:
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
# tell client side that we've cancelled the
# dark-trigger order
await client_order_stream.send(
Status(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
)
)
# de-register this client dialogue
router.dialogues.pop(oid)
# except KeyError:
# log.exception(f'No dark order for {symbol}?')
# live order submission
case {
'oid': oid,
'symbol': fqsn,
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': 'live',
}:
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
2022-08-04 19:53:50 +00:00
req = Order(**cmd)
broker = req.brokers[0]
# remove the broker part before creating a message
# to send to the specific broker since they probably
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None:
# sanity check on emsd id, but it won't work
# for pre-existing orders that we load since
# the only msg will be a ``BrokerdStatus``
# assert live_entry.oid == oid
reqid = live_entry.reqid
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}")
msg = BrokerdOrder(
oid=oid, # no ib support for oids...
time_ns=time.time_ns(),
# if this is None, creates a new order
# otherwise will modify any existing one
reqid=reqid,
symbol=sym,
action=action,
price=trigger_price,
size=size,
2022-08-04 19:53:50 +00:00
account=req.account,
)
# send request to backend
# XXX: the trades data broker response loop
# (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to
# the client/cmd sender from this request
2021-06-10 12:24:10 +00:00
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()``
# endpoint, but we register our request as part of the
# flow so that if a cancel comes from the requesting
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
dark_book._ems_entries[oid] = msg
# dark-order / alert submission
case {
'oid': oid,
'symbol': fqsn,
'price': trigger_price,
'size': size,
'exec_mode': exec_mode,
'action': action,
2022-08-04 19:53:50 +00:00
'brokers': brokers, # list
} if (
# "DARK" triggers
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper')
or action == 'alert'
):
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
# condition should be based on the current first
# price received from the feed, instead of being
# like every other shitty tina platform that makes
# the user choose the predicate operator.
last = dark_book.lasts[fqsn]
# sometimes the real-time feed hasn't come up
# so just pull from the latest history.
if isnan(last):
last = feed.shm.array[-1]['close']
2021-03-29 12:35:58 +00:00
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
2022-08-04 19:53:50 +00:00
sym = fqsn.replace(f'.{brokers[0]}', '')
min_tick = feed.symbols[sym].tick_size
if action == 'buy':
tickfilter = ('ask', 'last', 'trade')
percent_away = 0.005
# TODO: we probably need to scale this based
# on some near term historical spread
# measure?
abs_diff_away = spread_slap * min_tick
elif action == 'sell':
tickfilter = ('bid', 'last', 'trade')
percent_away = -0.005
abs_diff_away = -spread_slap * min_tick
else: # alert
tickfilter = ('trade', 'utrade', 'last')
percent_away = 0
abs_diff_away = 0
# submit execution/order to EMS scan loop
# NOTE: this may result in an override of an existing
# dark book entry if the order id already exists
dark_book.orders.setdefault(
fqsn, {}
)[oid] = (
pred,
tickfilter,
cmd,
percent_away,
abs_diff_away
)
resp = 'dark_submitted'
# alerts have special msgs to distinguish
if action == 'alert':
resp = 'alert_submitted'
await client_order_stream.send(
Status(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
)
)
@tractor.context
async def _emsd_main(
2021-01-14 17:59:00 +00:00
ctx: tractor.Context,
fqsn: str,
exec_mode: str, # ('paper', 'live')
loglevel: str = 'info',
2021-01-14 17:59:00 +00:00
) -> None:
2022-08-04 20:25:46 +00:00
'''
EMS (sub)actor entrypoint providing the execution management
(micro)service which conducts broker order clearing control on
behalf of clients.
2021-01-19 00:55:50 +00:00
This is the daemon (child) side routine which starts an EMS runtime
task (one per broker-feed) and and begins streaming back alerts from
each broker's executions/fills.
2021-01-19 00:55:50 +00:00
``send_order_cmds()`` is called here to execute in a task back in
the actor which started this service (spawned this actor), presuming
capabilities allow it, such that requests for EMS executions are
received in a stream from that client actor and then responses are
streamed back up to the original calling task in the same client.
The primary ``emsd`` task tree is:
2021-01-08 03:08:25 +00:00
- ``_emsd_main()``:
sets up brokerd feed, order feed with ems client, trades dialogue with
brokderd trading api.
|
2021-06-08 16:50:52 +00:00
- ``clear_dark_triggers()``:
run (dark order) conditions on inputs and trigger brokerd "live"
order submissions.
|
- (maybe) ``translate_and_relay_brokerd_events()``:
accept normalized trades responses from brokerd, process and
relay to ems client(s); this is a effectively a "trade event
reponse" proxy-broker.
|
- ``process_client_order_cmds()``:
accepts order cmds from requesting clients, registers dark orders and
alerts with clearing loop.
2021-01-08 03:08:25 +00:00
2021-06-10 12:24:10 +00:00
'''
global _router
assert _router
from ..data._source import unpack_fqsn
broker, symbol, suffix = unpack_fqsn(fqsn)
dark_book = _router.get_dark_book(broker)
2021-01-09 15:55:36 +00:00
2021-06-10 12:24:10 +00:00
# TODO: would be nice if in tractor we can require either a ctx arg,
# or a named arg with ctx in it and a type annotation of
# tractor.Context instead of strictly requiring a ctx arg.
ems_ctx = ctx
# spawn one task per broker feed
2022-08-04 20:25:46 +00:00
feed: Feed
async with (
maybe_open_feed(
[fqsn],
loglevel=loglevel,
) as (feed, quote_stream),
):
# XXX: this should be initial price quote from target provider
first_quote: dict = feed.first_quotes[fqsn]
book: _DarkBook = _router.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']
# open a stream with the brokerd backend for order
# flow dialogue
async with (
# only open if one isn't already up: we try to keep
# as few duplicate streams as necessary
_router.maybe_open_brokerd_trades_dialogue(
feed,
symbol,
dark_book,
exec_mode,
loglevel,
) as relay,
trio.open_nursery() as n,
):
brokerd_stream = relay.brokerd_dialogue # .clone()
# signal to client that we're started and deliver
# all known pps and accounts for this ``brokerd``.
await ems_ctx.started((
relay.positions,
list(relay.accounts),
book._ems_entries,
))
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream:
# register the client side before startingn the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream)
n.start_soon(
translate_and_relay_brokerd_events,
broker,
brokerd_stream,
_router,
)
# trigger scan and exec loop
n.start_soon(
clear_dark_triggers,
brokerd_stream,
ems_client_order_stream,
quote_stream,
broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
)
# start inbound (from attached client) order request processing
try:
# main entrypoint, run here until cancelled.
await process_client_order_cmds(
ems_client_order_stream,
# relay.brokerd_dialogue,
brokerd_stream,
fqsn,
feed,
dark_book,
_router,
)
finally:
# try to remove client from "registry"
try:
_router.clients.remove(ems_client_order_stream)
except KeyError:
log.warning(
f'Stream {ems_client_order_stream._ctx.chan.uid}'
' was already dropped?'
)
dialogues = _router.dialogues
for oid, client_stream in dialogues.copy().items():
if client_stream == ems_client_order_stream:
log.warning(
f'client dialogue is being abandoned:\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)
dialogues.pop(oid)
# TODO: for order dialogues left "alive" in
# the ems this is where we should allow some
# system to take over management. Likely we
# want to allow the user to choose what kind
# of policy to use (eg. cancel all orders
# from client, run some algo, etc.)