Expect fqsn in ems and order mode
Use fqsn as input to the client-side EMS apis but strip broker-name stuff before generating and sending `Brokerd*` msgs to each backend for live order requests (since it's weird for a backend to expect it's own name, though maybe that could be a sanity check?). Summary of fqsn use vs. broker native keys: - client side pps, order requests and general UX for order management use an fqsn for tracking - brokerd side order dialogs use the broker-specific symbol which is usually nearly the same key minus the broker name - internal dark book and quote feed lookups use the fqsn where possiblefqsns
parent
d62a636bcc
commit
c7f3e59105
|
@ -18,7 +18,7 @@
|
||||||
Orders and execution client API.
|
Orders and execution client API.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager as acm
|
||||||
from typing import Dict
|
from typing import Dict
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
@ -27,7 +27,6 @@ import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import broadcast_receiver
|
from tractor.trionics import broadcast_receiver
|
||||||
|
|
||||||
from ..data._source import Symbol
|
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ._ems import _emsd_main
|
from ._ems import _emsd_main
|
||||||
from .._daemon import maybe_open_emsd
|
from .._daemon import maybe_open_emsd
|
||||||
|
@ -156,16 +155,19 @@ async def relay_order_cmds_from_sync_code(
|
||||||
await to_ems_stream.send(cmd)
|
await to_ems_stream.send(cmd)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def open_ems(
|
async def open_ems(
|
||||||
broker: str,
|
fqsn: str,
|
||||||
symbol: Symbol,
|
|
||||||
|
|
||||||
) -> (OrderBook, tractor.MsgStream, dict):
|
) -> (
|
||||||
"""Spawn an EMS daemon and begin sending orders and receiving
|
OrderBook,
|
||||||
|
tractor.MsgStream,
|
||||||
|
dict,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Spawn an EMS daemon and begin sending orders and receiving
|
||||||
alerts.
|
alerts.
|
||||||
|
|
||||||
|
|
||||||
This EMS tries to reduce most broker's terrible order entry apis to
|
This EMS tries to reduce most broker's terrible order entry apis to
|
||||||
a very simple protocol built on a few easy to grok and/or
|
a very simple protocol built on a few easy to grok and/or
|
||||||
"rantsy" premises:
|
"rantsy" premises:
|
||||||
|
@ -194,21 +196,22 @@ async def open_ems(
|
||||||
- 'dark_executed', 'broker_executed'
|
- 'dark_executed', 'broker_executed'
|
||||||
- 'broker_filled'
|
- 'broker_filled'
|
||||||
|
|
||||||
"""
|
'''
|
||||||
# wait for service to connect back to us signalling
|
# wait for service to connect back to us signalling
|
||||||
# ready for order commands
|
# ready for order commands
|
||||||
book = get_orders()
|
book = get_orders()
|
||||||
|
|
||||||
|
from ..data._source import uncons_fqsn
|
||||||
|
broker, symbol, suffix = uncons_fqsn(fqsn)
|
||||||
|
|
||||||
async with maybe_open_emsd(broker) as portal:
|
async with maybe_open_emsd(broker) as portal:
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
|
|
||||||
# connect to emsd
|
# connect to emsd
|
||||||
portal.open_context(
|
portal.open_context(
|
||||||
|
|
||||||
_emsd_main,
|
_emsd_main,
|
||||||
broker=broker,
|
fqsn=fqsn,
|
||||||
symbol=symbol.key,
|
|
||||||
|
|
||||||
) as (ctx, (positions, accounts)),
|
) as (ctx, (positions, accounts)),
|
||||||
|
|
||||||
|
@ -218,7 +221,7 @@ async def open_ems(
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
relay_order_cmds_from_sync_code,
|
relay_order_cmds_from_sync_code,
|
||||||
symbol.key,
|
fqsn,
|
||||||
trades_stream
|
trades_stream
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ In da suit parlances: "Execution management systems"
|
||||||
"""
|
"""
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from math import isnan
|
# from math import isnan
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import time
|
import time
|
||||||
from typing import AsyncIterator, Callable
|
from typing import AsyncIterator, Callable
|
||||||
|
@ -113,8 +113,8 @@ class _DarkBook:
|
||||||
|
|
||||||
# tracks most recent values per symbol each from data feed
|
# tracks most recent values per symbol each from data feed
|
||||||
lasts: dict[
|
lasts: dict[
|
||||||
tuple[str, str],
|
str,
|
||||||
float
|
float,
|
||||||
] = field(default_factory=dict)
|
] = field(default_factory=dict)
|
||||||
|
|
||||||
# mapping of piker ems order ids to current brokerd order flow message
|
# mapping of piker ems order ids to current brokerd order flow message
|
||||||
|
@ -135,7 +135,7 @@ async def clear_dark_triggers(
|
||||||
ems_client_order_stream: tractor.MsgStream,
|
ems_client_order_stream: tractor.MsgStream,
|
||||||
quote_stream: tractor.ReceiveMsgStream, # noqa
|
quote_stream: tractor.ReceiveMsgStream, # noqa
|
||||||
broker: str,
|
broker: str,
|
||||||
symbol: str,
|
fqsn: str,
|
||||||
|
|
||||||
book: _DarkBook,
|
book: _DarkBook,
|
||||||
|
|
||||||
|
@ -155,7 +155,6 @@ async def clear_dark_triggers(
|
||||||
# start = time.time()
|
# start = time.time()
|
||||||
for sym, quote in quotes.items():
|
for sym, quote in quotes.items():
|
||||||
execs = book.orders.get(sym, {})
|
execs = book.orders.get(sym, {})
|
||||||
|
|
||||||
for tick in iterticks(
|
for tick in iterticks(
|
||||||
quote,
|
quote,
|
||||||
# dark order price filter(s)
|
# dark order price filter(s)
|
||||||
|
@ -171,7 +170,7 @@ async def clear_dark_triggers(
|
||||||
ttype = tick['type']
|
ttype = tick['type']
|
||||||
|
|
||||||
# update to keep new cmds informed
|
# update to keep new cmds informed
|
||||||
book.lasts[(broker, symbol)] = price
|
book.lasts[sym] = price
|
||||||
|
|
||||||
for oid, (
|
for oid, (
|
||||||
pred,
|
pred,
|
||||||
|
@ -196,6 +195,7 @@ async def clear_dark_triggers(
|
||||||
|
|
||||||
action: str = cmd['action']
|
action: str = cmd['action']
|
||||||
symbol: str = cmd['symbol']
|
symbol: str = cmd['symbol']
|
||||||
|
bfqsn: str = symbol.replace(f'.{broker}', '')
|
||||||
|
|
||||||
if action == 'alert':
|
if action == 'alert':
|
||||||
# nothing to do but relay a status
|
# nothing to do but relay a status
|
||||||
|
@ -225,7 +225,7 @@ async def clear_dark_triggers(
|
||||||
# order-request and instead create a new one.
|
# order-request and instead create a new one.
|
||||||
reqid=None,
|
reqid=None,
|
||||||
|
|
||||||
symbol=sym,
|
symbol=bfqsn,
|
||||||
price=submit_price,
|
price=submit_price,
|
||||||
size=cmd['size'],
|
size=cmd['size'],
|
||||||
)
|
)
|
||||||
|
@ -247,12 +247,9 @@ async def clear_dark_triggers(
|
||||||
oid=oid, # ems order id
|
oid=oid, # ems order id
|
||||||
resp=resp,
|
resp=resp,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
|
symbol=fqsn,
|
||||||
symbol=symbol,
|
|
||||||
trigger_price=price,
|
trigger_price=price,
|
||||||
|
|
||||||
broker_details={'name': broker},
|
broker_details={'name': broker},
|
||||||
|
|
||||||
cmd=cmd, # original request message
|
cmd=cmd, # original request message
|
||||||
|
|
||||||
).dict()
|
).dict()
|
||||||
|
@ -270,7 +267,7 @@ async def clear_dark_triggers(
|
||||||
else: # condition scan loop complete
|
else: # condition scan loop complete
|
||||||
log.debug(f'execs are {execs}')
|
log.debug(f'execs are {execs}')
|
||||||
if execs:
|
if execs:
|
||||||
book.orders[symbol] = execs
|
book.orders[fqsn] = execs
|
||||||
|
|
||||||
# print(f'execs scan took: {time.time() - start}')
|
# print(f'execs scan took: {time.time() - start}')
|
||||||
|
|
||||||
|
@ -382,7 +379,8 @@ async def open_brokerd_trades_dialogue(
|
||||||
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> tuple[dict, tractor.MsgStream]:
|
) -> tuple[dict, tractor.MsgStream]:
|
||||||
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
'''
|
||||||
|
Open and yield ``brokerd`` trades dialogue context-stream if none
|
||||||
already exists.
|
already exists.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
@ -458,12 +456,13 @@ async def open_brokerd_trades_dialogue(
|
||||||
# locally cache and track positions per account.
|
# locally cache and track positions per account.
|
||||||
pps = {}
|
pps = {}
|
||||||
for msg in positions:
|
for msg in positions:
|
||||||
|
log.info(f'loading pp: {msg}')
|
||||||
|
|
||||||
account = msg['account']
|
account = msg['account']
|
||||||
assert account in accounts
|
assert account in accounts
|
||||||
|
|
||||||
pps.setdefault(
|
pps.setdefault(
|
||||||
msg['symbol'],
|
f'{msg["symbol"]}.{broker}',
|
||||||
{}
|
{}
|
||||||
)[account] = msg
|
)[account] = msg
|
||||||
|
|
||||||
|
@ -563,7 +562,13 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# XXX: this will be useful for automatic strats yah?
|
# XXX: this will be useful for automatic strats yah?
|
||||||
# keep pps per account up to date locally in ``emsd`` mem
|
# keep pps per account up to date locally in ``emsd`` mem
|
||||||
relay.positions.setdefault(pos_msg['symbol'], {}).setdefault(
|
sym, broker = pos_msg['symbol'], pos_msg['broker']
|
||||||
|
|
||||||
|
relay.positions.setdefault(
|
||||||
|
# NOTE: translate to a FQSN!
|
||||||
|
f'{sym}.{broker}',
|
||||||
|
{}
|
||||||
|
).setdefault(
|
||||||
pos_msg['account'], {}
|
pos_msg['account'], {}
|
||||||
).update(pos_msg)
|
).update(pos_msg)
|
||||||
|
|
||||||
|
@ -840,11 +845,15 @@ async def process_client_order_cmds(
|
||||||
|
|
||||||
msg = Order(**cmd)
|
msg = Order(**cmd)
|
||||||
|
|
||||||
sym = msg.symbol
|
fqsn = msg.symbol
|
||||||
trigger_price = msg.price
|
trigger_price = msg.price
|
||||||
size = msg.size
|
size = msg.size
|
||||||
exec_mode = msg.exec_mode
|
exec_mode = msg.exec_mode
|
||||||
broker = msg.brokers[0]
|
broker = msg.brokers[0]
|
||||||
|
# remove the broker part before creating a message
|
||||||
|
# to send to the specific broker since they probably
|
||||||
|
# aren't expectig their own name, but should they?
|
||||||
|
sym = fqsn.replace(f'.{broker}', '')
|
||||||
|
|
||||||
if exec_mode == 'live' and action in ('buy', 'sell',):
|
if exec_mode == 'live' and action in ('buy', 'sell',):
|
||||||
|
|
||||||
|
@ -902,7 +911,7 @@ async def process_client_order_cmds(
|
||||||
# price received from the feed, instead of being
|
# price received from the feed, instead of being
|
||||||
# like every other shitty tina platform that makes
|
# like every other shitty tina platform that makes
|
||||||
# the user choose the predicate operator.
|
# the user choose the predicate operator.
|
||||||
last = dark_book.lasts[(broker, sym)]
|
last = dark_book.lasts[fqsn]
|
||||||
pred = mk_check(trigger_price, last, action)
|
pred = mk_check(trigger_price, last, action)
|
||||||
|
|
||||||
spread_slap: float = 5
|
spread_slap: float = 5
|
||||||
|
@ -933,7 +942,7 @@ async def process_client_order_cmds(
|
||||||
# dark book entry if the order id already exists
|
# dark book entry if the order id already exists
|
||||||
|
|
||||||
dark_book.orders.setdefault(
|
dark_book.orders.setdefault(
|
||||||
sym, {}
|
fqsn, {}
|
||||||
)[oid] = (
|
)[oid] = (
|
||||||
pred,
|
pred,
|
||||||
tickfilter,
|
tickfilter,
|
||||||
|
@ -960,8 +969,8 @@ async def process_client_order_cmds(
|
||||||
async def _emsd_main(
|
async def _emsd_main(
|
||||||
|
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
broker: str,
|
fqsn: str,
|
||||||
symbol: str,
|
|
||||||
_exec_mode: str = 'dark', # ('paper', 'dark', 'live')
|
_exec_mode: str = 'dark', # ('paper', 'dark', 'live')
|
||||||
loglevel: str = 'info',
|
loglevel: str = 'info',
|
||||||
|
|
||||||
|
@ -1003,6 +1012,8 @@ async def _emsd_main(
|
||||||
global _router
|
global _router
|
||||||
assert _router
|
assert _router
|
||||||
|
|
||||||
|
from ..data._source import uncons_fqsn
|
||||||
|
broker, symbol, suffix = uncons_fqsn(fqsn)
|
||||||
dark_book = _router.get_dark_book(broker)
|
dark_book = _router.get_dark_book(broker)
|
||||||
|
|
||||||
# TODO: would be nice if in tractor we can require either a ctx arg,
|
# TODO: would be nice if in tractor we can require either a ctx arg,
|
||||||
|
@ -1015,17 +1026,16 @@ async def _emsd_main(
|
||||||
# spawn one task per broker feed
|
# spawn one task per broker feed
|
||||||
async with (
|
async with (
|
||||||
maybe_open_feed(
|
maybe_open_feed(
|
||||||
broker,
|
[fqsn],
|
||||||
[symbol],
|
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
) as (feed, stream),
|
) as (feed, quote_stream),
|
||||||
):
|
):
|
||||||
|
|
||||||
# XXX: this should be initial price quote from target provider
|
# XXX: this should be initial price quote from target provider
|
||||||
first_quote = feed.first_quotes[symbol]
|
first_quote = feed.first_quotes[fqsn]
|
||||||
|
|
||||||
book = _router.get_dark_book(broker)
|
book = _router.get_dark_book(broker)
|
||||||
last = book.lasts[(broker, symbol)] = first_quote['last']
|
book.lasts[fqsn] = first_quote['last']
|
||||||
|
|
||||||
# XXX: ib is a cucker but we've fixed avoiding receiving any
|
# XXX: ib is a cucker but we've fixed avoiding receiving any
|
||||||
# `Nan`s in the backend during market hours (right?). this was
|
# `Nan`s in the backend during market hours (right?). this was
|
||||||
|
@ -1054,8 +1064,8 @@ async def _emsd_main(
|
||||||
|
|
||||||
# flatten out collected pps from brokerd for delivery
|
# flatten out collected pps from brokerd for delivery
|
||||||
pp_msgs = {
|
pp_msgs = {
|
||||||
sym: list(pps.values())
|
fqsn: list(pps.values())
|
||||||
for sym, pps in relay.positions.items()
|
for fqsn, pps in relay.positions.items()
|
||||||
}
|
}
|
||||||
|
|
||||||
# signal to client that we're started and deliver
|
# signal to client that we're started and deliver
|
||||||
|
@ -1072,9 +1082,9 @@ async def _emsd_main(
|
||||||
|
|
||||||
brokerd_stream,
|
brokerd_stream,
|
||||||
ems_client_order_stream,
|
ems_client_order_stream,
|
||||||
stream,
|
quote_stream,
|
||||||
broker,
|
broker,
|
||||||
symbol,
|
fqsn, # form: <name>.<venue>.<suffix>.<broker>
|
||||||
book
|
book
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1090,7 +1100,7 @@ async def _emsd_main(
|
||||||
# relay.brokerd_dialogue,
|
# relay.brokerd_dialogue,
|
||||||
brokerd_stream,
|
brokerd_stream,
|
||||||
|
|
||||||
symbol,
|
fqsn,
|
||||||
feed,
|
feed,
|
||||||
dark_book,
|
dark_book,
|
||||||
_router,
|
_router,
|
||||||
|
|
|
@ -268,13 +268,14 @@ class OrderMode:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
staged = self._staged_order
|
staged = self._staged_order
|
||||||
symbol = staged.symbol
|
symbol: Symbol = staged.symbol
|
||||||
oid = str(uuid.uuid4())
|
oid = str(uuid.uuid4())
|
||||||
|
|
||||||
# format order data for ems
|
# format order data for ems
|
||||||
|
fqsn = symbol.front_fqsn()
|
||||||
order = staged.copy(
|
order = staged.copy(
|
||||||
update={
|
update={
|
||||||
'symbol': symbol.key,
|
'symbol': fqsn,
|
||||||
'oid': oid,
|
'oid': oid,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -519,8 +520,7 @@ async def open_order_mode(
|
||||||
|
|
||||||
feed: Feed,
|
feed: Feed,
|
||||||
chart: 'ChartPlotWidget', # noqa
|
chart: 'ChartPlotWidget', # noqa
|
||||||
symbol: Symbol,
|
fqsn: str,
|
||||||
brokername: str,
|
|
||||||
started: trio.Event,
|
started: trio.Event,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -546,8 +546,7 @@ async def open_order_mode(
|
||||||
|
|
||||||
# spawn EMS actor-service
|
# spawn EMS actor-service
|
||||||
async with (
|
async with (
|
||||||
|
open_ems(fqsn) as (
|
||||||
open_ems(brokername, symbol) as (
|
|
||||||
book,
|
book,
|
||||||
trades_stream,
|
trades_stream,
|
||||||
position_msgs,
|
position_msgs,
|
||||||
|
@ -556,8 +555,7 @@ async def open_order_mode(
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
|
|
||||||
):
|
):
|
||||||
log.info(f'Opening order mode for {brokername}.{symbol.key}')
|
log.info(f'Opening order mode for {fqsn}')
|
||||||
|
|
||||||
view = chart.view
|
view = chart.view
|
||||||
|
|
||||||
# annotations editors
|
# annotations editors
|
||||||
|
@ -566,7 +564,7 @@ async def open_order_mode(
|
||||||
|
|
||||||
# symbol id
|
# symbol id
|
||||||
symbol = chart.linked.symbol
|
symbol = chart.linked.symbol
|
||||||
symkey = symbol.key
|
symkey = symbol.front_fqsn()
|
||||||
|
|
||||||
# map of per-provider account keys to position tracker instances
|
# map of per-provider account keys to position tracker instances
|
||||||
trackers: dict[str, PositionTracker] = {}
|
trackers: dict[str, PositionTracker] = {}
|
||||||
|
@ -610,7 +608,7 @@ async def open_order_mode(
|
||||||
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
|
log.info(f'Loading pp for {symkey}:\n{pformat(msg)}')
|
||||||
startup_pp.update_from_msg(msg)
|
startup_pp.update_from_msg(msg)
|
||||||
|
|
||||||
# allocator
|
# allocator config
|
||||||
alloc = mk_allocator(
|
alloc = mk_allocator(
|
||||||
symbol=symbol,
|
symbol=symbol,
|
||||||
account=account_name,
|
account=account_name,
|
||||||
|
@ -818,8 +816,10 @@ async def process_trades_and_update_ui(
|
||||||
'position',
|
'position',
|
||||||
):
|
):
|
||||||
sym = mode.chart.linked.symbol
|
sym = mode.chart.linked.symbol
|
||||||
if msg['symbol'].lower() in sym.key:
|
symbol = msg['symbol'].lower()
|
||||||
|
fqsn = sym.front_fqsn()
|
||||||
|
|
||||||
|
if symbol in fqsn:
|
||||||
tracker = mode.trackers[msg['account']]
|
tracker = mode.trackers[msg['account']]
|
||||||
tracker.live_pp.update_from_msg(msg)
|
tracker.live_pp.update_from_msg(msg)
|
||||||
# update order pane widgets
|
# update order pane widgets
|
||||||
|
|
Loading…
Reference in New Issue