Expect fqsn in ems and order mode

Use fqsn as input to the client-side EMS apis but strip broker-name
stuff before generating and sending `Brokerd*` msgs to each backend for
live order requests (since it's weird for a backend to expect it's own
name, though maybe that could be a sanity check?).

Summary of fqsn use vs. broker native keys:
- client side pps, order requests and general UX for order management
  use an fqsn for tracking
- brokerd side order dialogs use the broker-specific symbol which is
  usually nearly the same key minus the broker name
- internal dark book and quote feed lookups use the fqsn where possible
mkts_backup
Tyler Goodlet 2022-03-18 17:31:09 -04:00
parent 4c6e5598f2
commit 8e8c1c14ce
3 changed files with 67 additions and 54 deletions

View File

@ -18,7 +18,7 @@
Orders and execution client API.
"""
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as acm
from typing import Dict
from pprint import pformat
from dataclasses import dataclass, field
@ -27,7 +27,6 @@ import trio
import tractor
from tractor.trionics import broadcast_receiver
from ..data._source import Symbol
from ..log import get_logger
from ._ems import _emsd_main
from .._daemon import maybe_open_emsd
@ -156,16 +155,19 @@ async def relay_order_cmds_from_sync_code(
await to_ems_stream.send(cmd)
@asynccontextmanager
@acm
async def open_ems(
broker: str,
symbol: Symbol,
fqsn: str,
) -> (OrderBook, tractor.MsgStream, dict):
"""Spawn an EMS daemon and begin sending orders and receiving
) -> (
OrderBook,
tractor.MsgStream,
dict,
):
'''
Spawn an EMS daemon and begin sending orders and receiving
alerts.
This EMS tries to reduce most broker's terrible order entry apis to
a very simple protocol built on a few easy to grok and/or
"rantsy" premises:
@ -194,21 +196,22 @@ async def open_ems(
- 'dark_executed', 'broker_executed'
- 'broker_filled'
"""
'''
# wait for service to connect back to us signalling
# ready for order commands
book = get_orders()
from ..data._source import uncons_fqsn
broker, symbol, suffix = uncons_fqsn(fqsn)
async with maybe_open_emsd(broker) as portal:
async with (
# connect to emsd
portal.open_context(
_emsd_main,
broker=broker,
symbol=symbol.key,
fqsn=fqsn,
) as (ctx, (positions, accounts)),
@ -218,7 +221,7 @@ async def open_ems(
async with trio.open_nursery() as n:
n.start_soon(
relay_order_cmds_from_sync_code,
symbol.key,
fqsn,
trades_stream
)

View File

@ -20,7 +20,7 @@ In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
# from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
@ -113,8 +113,8 @@ class _DarkBook:
# tracks most recent values per symbol each from data feed
lasts: dict[
tuple[str, str],
float
str,
float,
] = field(default_factory=dict)
# mapping of piker ems order ids to current brokerd order flow message
@ -135,7 +135,7 @@ async def clear_dark_triggers(
ems_client_order_stream: tractor.MsgStream,
quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str,
symbol: str,
fqsn: str,
book: _DarkBook,
@ -155,7 +155,6 @@ async def clear_dark_triggers(
# start = time.time()
for sym, quote in quotes.items():
execs = book.orders.get(sym, {})
for tick in iterticks(
quote,
# dark order price filter(s)
@ -171,7 +170,7 @@ async def clear_dark_triggers(
ttype = tick['type']
# update to keep new cmds informed
book.lasts[(broker, symbol)] = price
book.lasts[sym] = price
for oid, (
pred,
@ -196,6 +195,7 @@ async def clear_dark_triggers(
action: str = cmd['action']
symbol: str = cmd['symbol']
bfqsn: str = symbol.replace(f'.{broker}', '')
if action == 'alert':
# nothing to do but relay a status
@ -225,7 +225,7 @@ async def clear_dark_triggers(
# order-request and instead create a new one.
reqid=None,
symbol=sym,
symbol=bfqsn,
price=submit_price,
size=cmd['size'],
)
@ -247,12 +247,9 @@ async def clear_dark_triggers(
oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(),
symbol=symbol,
symbol=fqsn,
trigger_price=price,
broker_details={'name': broker},
cmd=cmd, # original request message
).dict()
@ -270,7 +267,7 @@ async def clear_dark_triggers(
else: # condition scan loop complete
log.debug(f'execs are {execs}')
if execs:
book.orders[symbol] = execs
book.orders[fqsn] = execs
# print(f'execs scan took: {time.time() - start}')
@ -382,7 +379,8 @@ async def open_brokerd_trades_dialogue(
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
'''
Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
@ -458,12 +456,13 @@ async def open_brokerd_trades_dialogue(
# locally cache and track positions per account.
pps = {}
for msg in positions:
log.info(f'loading pp: {msg}')
account = msg['account']
assert account in accounts
pps.setdefault(
msg['symbol'],
f'{msg["symbol"]}.{broker}',
{}
)[account] = msg
@ -562,7 +561,13 @@ async def translate_and_relay_brokerd_events(
# XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem
relay.positions.setdefault(pos_msg['symbol'], {}).setdefault(
sym, broker = pos_msg['symbol'], pos_msg['broker']
relay.positions.setdefault(
# NOTE: translate to a FQSN!
f'{sym}.{broker}',
{}
).setdefault(
pos_msg['account'], {}
).update(pos_msg)
@ -839,11 +844,15 @@ async def process_client_order_cmds(
msg = Order(**cmd)
sym = msg.symbol
fqsn = msg.symbol
trigger_price = msg.price
size = msg.size
exec_mode = msg.exec_mode
broker = msg.brokers[0]
# remove the broker part before creating a message
# to send to the specific broker since they probably
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if exec_mode == 'live' and action in ('buy', 'sell',):
@ -901,7 +910,7 @@ async def process_client_order_cmds(
# price received from the feed, instead of being
# like every other shitty tina platform that makes
# the user choose the predicate operator.
last = dark_book.lasts[(broker, sym)]
last = dark_book.lasts[fqsn]
pred = mk_check(trigger_price, last, action)
spread_slap: float = 5
@ -932,7 +941,7 @@ async def process_client_order_cmds(
# dark book entry if the order id already exists
dark_book.orders.setdefault(
sym, {}
fqsn, {}
)[oid] = (
pred,
tickfilter,
@ -959,8 +968,8 @@ async def process_client_order_cmds(
async def _emsd_main(
ctx: tractor.Context,
broker: str,
symbol: str,
fqsn: str,
_exec_mode: str = 'dark', # ('paper', 'dark', 'live')
loglevel: str = 'info',
@ -1002,6 +1011,8 @@ async def _emsd_main(
global _router
assert _router
from ..data._source import uncons_fqsn
broker, symbol, suffix = uncons_fqsn(fqsn)
dark_book = _router.get_dark_book(broker)
# TODO: would be nice if in tractor we can require either a ctx arg,
@ -1014,17 +1025,16 @@ async def _emsd_main(
# spawn one task per broker feed
async with (
maybe_open_feed(
broker,
[symbol],
[fqsn],
loglevel=loglevel,
) as (feed, stream),
) as (feed, quote_stream),
):
# XXX: this should be initial price quote from target provider
first_quote = feed.first_quotes[symbol]
first_quote = feed.first_quotes[fqsn]
book = _router.get_dark_book(broker)
last = book.lasts[(broker, symbol)] = first_quote['last']
book.lasts[fqsn] = first_quote['last']
# XXX: ib is a cucker but we've fixed avoiding receiving any
# `Nan`s in the backend during market hours (right?). this was
@ -1053,8 +1063,8 @@ async def _emsd_main(
# flatten out collected pps from brokerd for delivery
pp_msgs = {
sym: list(pps.values())
for sym, pps in relay.positions.items()
fqsn: list(pps.values())
for fqsn, pps in relay.positions.items()
}
# signal to client that we're started and deliver
@ -1071,9 +1081,9 @@ async def _emsd_main(
brokerd_stream,
ems_client_order_stream,
stream,
quote_stream,
broker,
symbol,
fqsn, # form: <name>.<venue>.<suffix>.<broker>
book
)
@ -1089,7 +1099,7 @@ async def _emsd_main(
# relay.brokerd_dialogue,
brokerd_stream,
symbol,
fqsn,
feed,
dark_book,
_router,

View File

@ -268,13 +268,14 @@ class OrderMode:
'''
staged = self._staged_order
symbol = staged.symbol
symbol: Symbol = staged.symbol
oid = str(uuid.uuid4())
# format order data for ems
fqsn = symbol.front_fqsn()
order = staged.copy(
update={
'symbol': symbol.key,
'symbol': fqsn,
'oid': oid,
}
)
@ -519,8 +520,7 @@ async def open_order_mode(
feed: Feed,
chart: 'ChartPlotWidget', # noqa
symbol: Symbol,
brokername: str,
fqsn: str,
started: trio.Event,
) -> None:
@ -546,8 +546,7 @@ async def open_order_mode(
# spawn EMS actor-service
async with (
open_ems(brokername, symbol) as (
open_ems(fqsn) as (
book,
trades_stream,
position_msgs,
@ -556,8 +555,7 @@ async def open_order_mode(
trio.open_nursery() as tn,
):
log.info(f'Opening order mode for {brokername}.{symbol.key}')
log.info(f'Opening order mode for {fqsn}')
view = chart.view
# annotations editors
@ -566,7 +564,7 @@ async def open_order_mode(
# symbol id
symbol = chart.linked.symbol
symkey = symbol.key
symkey = symbol.front_fqsn()
# map of per-provider account keys to position tracker instances
trackers: dict[str, PositionTracker] = {}
@ -610,7 +608,7 @@ async def open_order_mode(
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
startup_pp.update_from_msg(msg)
# allocator
# allocator config
alloc = mk_allocator(
symbol=symbol,
account=account_name,
@ -818,8 +816,10 @@ async def process_trades_and_update_ui(
'position',
):
sym = mode.chart.linked.symbol
if msg['symbol'].lower() in sym.key:
symbol = msg['symbol'].lower()
fqsn = sym.front_fqsn()
if symbol in fqsn:
tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg)
# update order pane widgets