Merge pull request #374 from pikers/open_order_loading

Open order loading
asycvnc_pin_bump
goodboy 2022-08-19 15:23:49 -04:00 committed by GitHub
commit cf5b0bf9c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 1201 additions and 728 deletions

View File

@ -60,6 +60,8 @@ from piker.pp import (
)
from piker.log import get_console_log
from piker.clearing._messages import (
Order,
Status,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
@ -122,11 +124,13 @@ async def handle_order_requests(
f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
await ems_order_stream.send(
BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
))
)
)
continue
client = _accounts2clients.get(account)
@ -146,6 +150,14 @@ async def handle_order_requests(
# validate
order = BrokerdOrder(**request_msg)
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid = order.reqid
if reqid is not None:
reqid = int(reqid)
# call our client api to submit the order
reqid = client.submit_limit(
oid=order.oid,
@ -154,12 +166,7 @@ async def handle_order_requests(
action=order.action,
size=order.size,
account=acct_number,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
reqid=reqid,
)
if reqid is None:
await ems_order_stream.send(BrokerdError(
@ -181,7 +188,7 @@ async def handle_order_requests(
elif action == 'cancel':
msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=msg.reqid)
client.submit_cancel(reqid=int(msg.reqid))
else:
log.error(f'Unknown order command: {request_msg}')
@ -451,7 +458,6 @@ async def trades_dialogue(
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with (
# trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
# Open a trade ledgers stack for appending trade records over
@ -459,6 +465,7 @@ async def trades_dialogue(
# TODO: we probably want to generalize this into a "ledgers" api..
ledgers: dict[str, dict] = {}
tables: dict[str, PpTable] = {}
order_msgs: list[Status] = []
with (
ExitStack() as lstack,
):
@ -480,6 +487,49 @@ async def trades_dialogue(
for account, proxy in proxies.items():
client = aioclients[account]
trades: list[Trade] = client.ib.openTrades()
for trade in trades:
order = trade.order
quant = trade.order.totalQuantity
action = order.action.lower()
size = {
'sell': -1,
'buy': 1,
}[action] * quant
con = trade.contract
# TODO: in the case of the SMART venue (aka ib's
# router-clearing sys) we probably should handle
# showing such orders overtop of the fqsn for the
# primary exchange, how to map this easily is going
# to be a bit tricky though?
deats = await proxy.con_deats(contracts=[con])
fqsn = list(deats)[0]
reqid = order.orderId
# TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client
# side in the order mode loop?
msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=str(reqid),
reqid=reqid,
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=str(reqid),
symbol=fqsn,
account=accounts_def.inverse[order.account],
price=order.lmtPrice,
size=size,
),
src='ib',
)
order_msgs.append(msg)
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
@ -615,6 +665,9 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
for client in set(aioclients.values()):
trade_event_stream = await n.start(
@ -633,6 +686,7 @@ async def trades_dialogue(
# allocate event relay tasks for each client connection
n.start_soon(
deliver_trade_events,
n,
trade_event_stream,
ems_stream,
accounts_def,
@ -726,6 +780,7 @@ _statuses: dict[str, str] = {
async def deliver_trade_events(
nurse: trio.Nursery,
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
@ -750,8 +805,9 @@ async def deliver_trade_events(
log.info(f'ib sending {event_name}:\n{pformat(item)}')
match event_name:
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# NOTE: we remap statuses to the ems set via the
# ``_statuses: dict`` above.
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
@ -781,28 +837,90 @@ async def deliver_trade_events(
# unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
ib_status_key = status.status.lower()
# TODO: try out cancelling inactive orders after delay:
# https://github.com/erdewit/ib_insync/issues/363
# acctid = accounts_def.inverse[trade.order.account]
# # double check there is no error when
# # cancelling.. gawwwd
# if ib_status_key == 'cancelled':
# last_log = trade.log[-1]
# if (
# last_log.message
# and 'Error' not in last_log.message
# ):
# ib_status_key = trade.log[-2].status
# elif ib_status_key == 'inactive':
# async def sched_cancel():
# log.warning(
# 'OH GAWD an inactive order.scheduling a cancel\n'
# f'{pformat(item)}'
# )
# proxy = proxies[acctid]
# await proxy.submit_cancel(reqid=trade.order.orderId)
# await trio.sleep(1)
# nurse.start_soon(sched_cancel)
# nurse.start_soon(sched_cancel)
status_key = (
_statuses.get(ib_status_key.lower())
or ib_status_key.lower()
)
remaining = status.remaining
if (
status_key == 'filled'
):
fill: Fill = trade.fills[-1]
execu: Execution = fill.execution
# execdict = asdict(execu)
# execdict.pop('acctNumber')
fill_msg = BrokerdFill(
# should match the value returned from
# `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
action=action_map[execu.side],
size=execu.shares,
price=execu.price,
# broker_details=execdict,
# XXX: required by order mode currently
broker_time=execu.time,
)
await ems_stream.send(fill_msg)
if remaining == 0:
# emit a closed status on filled statuses where
# all units were cleared.
status_key = 'closed'
# skip duplicate filled updates - we get the deats
# from the execution details event
msg = BrokerdStatus(
reqid=trade.order.orderId,
time_ns=time.time_ns(), # cuz why not
account=accounts_def.inverse[trade.order.account],
# everyone doin camel case..
status=status.status.lower(), # force lower case
status=status_key, # force lower case
filled=status.filled,
reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
remaining=remaining,
broker_details={'name': 'ib'},
)
await ems_stream.send(msg)
continue
case 'fill':
@ -818,8 +936,6 @@ async def deliver_trade_events(
# https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations
trade: Trade
fill: Fill
# TODO: maybe we can use matching to better handle these cases.
trade, fill = item
execu: Execution = fill.execution
execid = execu.execId
@ -848,22 +964,6 @@ async def deliver_trade_events(
}
)
msg = BrokerdFill(
# should match the value returned from `.submit_limit()`
reqid=execu.orderId,
time_ns=time.time_ns(), # cuz why not
action=action_map[execu.side],
size=execu.shares,
price=execu.price,
broker_details=trade_entry,
# XXX: required by order mode currently
broker_time=trade_entry['broker_time'],
)
await ems_stream.send(msg)
# 2 cases:
# - fill comes first or
# - comms report comes first
@ -933,17 +1033,25 @@ async def deliver_trade_events(
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
# TODO: what schema for this msg if we're going to make it
# portable across all backends?
# msg = BrokerdError(**err)
# TODO: we don't want to relay data feed / lookup errors
# so we need some further filtering logic here..
# for most cases the 'status' block above should take
# care of this.
# await ems_stream.send(BrokerdStatus(
# status='error',
# reqid=err['reqid'],
# reason=err['reason'],
# time_ns=time.time_ns(),
# account=accounts_def.inverse[trade.order.account],
# broker_details={'name': 'ib'},
# ))
case 'position':
cid, msg = pack_position(item)
log.info(f'New IB position msg: {msg}')
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg)
continue
case 'event':

View File

@ -31,6 +31,7 @@ import time
from typing import (
Any,
AsyncIterator,
Iterable,
Union,
)
@ -39,7 +40,6 @@ from bidict import bidict
import pendulum
import trio
import tractor
import wsproto
from piker.pp import (
Position,
@ -49,6 +49,8 @@ from piker.pp import (
open_pps,
)
from piker.clearing._messages import (
Order,
Status,
BrokerdCancel,
BrokerdError,
BrokerdFill,
@ -85,6 +87,33 @@ class TooFastEdit(Exception):
'Edit requests faster then api submissions'
# TODO: make this wrap the `Client` and `ws` instances
# and give it methods to submit cancel vs. add vs. edit
# requests?
class BrokerClient:
'''
Actor global, client-unique order manager API.
For now provides unique ``brokerd`` defined "request ids"
and "user reference" values to track ``kraken`` ws api order
dialogs.
'''
counter: Iterable = count(1)
_table: set[int] = set()
@classmethod
def new_reqid(cls) -> int:
for reqid in cls.counter:
if reqid not in cls._table:
cls._table.add(reqid)
return reqid
@classmethod
def add_reqid(cls, reqid: int) -> None:
cls._table.add(reqid)
async def handle_order_requests(
ws: NoBsWs,
@ -104,7 +133,6 @@ async def handle_order_requests(
# XXX: UGH, let's unify this.. with ``msgspec``.
msg: dict[str, Any]
order: BrokerdOrder
counter = count(1)
async for msg in ems_order_stream:
log.info(f'Rx order msg:\n{pformat(msg)}')
@ -126,7 +154,7 @@ async def handle_order_requests(
oid=msg['oid'],
symbol=msg['symbol'],
reason=(
f'TooFastEdit reqid:{reqid}, could not cancelling..'
f'Edit too fast:{reqid}, cancelling..'
),
)
@ -177,7 +205,8 @@ async def handle_order_requests(
else:
ep = 'addOrder'
reqid = next(counter)
reqid = BrokerClient.new_reqid()
ids[order.oid] = reqid
log.debug(
f"Adding order {reqid}\n"
@ -249,7 +278,7 @@ async def handle_order_requests(
@acm
async def subscribe(
ws: wsproto.WSConnection,
ws: NoBsWs,
token: str,
subs: list[tuple[str, dict]] = [
('ownTrades', {
@ -632,8 +661,6 @@ async def handle_order_updates(
# to do all fill/status/pp updates in that sub and just use
# this one for ledger syncs?
# XXX: ASK SUPPORT ABOUT THIS!
# For eg. we could take the "last 50 trades" and do a diff
# with the ledger and then only do a re-sync if something
# seems amiss?
@ -696,7 +723,6 @@ async def handle_order_updates(
status_msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
account=acc_name,
status='filled',
filled=size,
@ -741,7 +767,6 @@ async def handle_order_updates(
f'{pformat(order_msg)}'
)
txid, update_msg = list(order_msg.items())[0]
match update_msg:
# XXX: eg. of full msg schema:
# {'avg_price': _,
@ -768,6 +793,66 @@ async def handle_order_updates(
# 'timeinforce': 'GTC',
# 'vol': submit_vlm, # '13.34400854',
# 'vol_exec': exec_vlm} # 0.0000
match update_msg:
# EMS-unknown live order that needs to be
# delivered and loaded on the client-side.
case {
'userref': reqid,
'descr': {
'pair': pair,
'price': price,
'type': action,
},
'vol': vol,
# during a fill this field is **not**
# provided! but, it is always avail on
# actual status updates.. see case above.
'status': status,
**rest,
} if (
ids.inverse.get(reqid) is None
):
# parse out existing live order
fqsn = pair.replace('/', '').lower()
price = float(price)
size = float(vol)
# register the userref value from
# kraken (usually an `int` staring
# at 1?) as our reqid.
reqids2txids[reqid] = txid
oid = str(reqid)
ids[oid] = reqid # NOTE!: str -> int
# ensure wtv reqid they give us we don't re-use on
# new order submissions to this actor's client.
BrokerClient.add_reqid(reqid)
# fill out ``Status`` + boxed ``Order``
status_msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=oid,
reqid=reqid,
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=oid,
symbol=fqsn,
account=acc_name,
price=price,
size=size,
),
src='kraken',
)
apiflows[reqid].maps.append(status_msg)
await ems_stream.send(status_msg)
continue
case {
'userref': reqid,
@ -821,10 +906,6 @@ async def handle_order_updates(
)
oid = ids.inverse.get(reqid)
if (
status == 'open'
and (
# XXX: too fast edit handled by the
# request handler task: this
# scenario occurs when ems side
@ -850,17 +931,13 @@ async def handle_order_updates(
# coming in too quickly on the other
# side of the ems, aka the client
# <-> ems dialog.
(toofast := isinstance(
if (
status == 'open'
and isinstance(
reqids2txids.get(reqid),
TooFastEdit
))
# pre-existing open order NOT from
# this EMS session.
or (noid := oid is None)
)
):
if toofast:
# TODO: don't even allow this case
# by not moving the client side line
# until an edit confirmation
@ -870,17 +947,6 @@ async def handle_order_updates(
f'{update_msg}\n'
'Cancelling order for now!..'
)
elif noid: # a non-ems-active order
# TODO: handle these and relay them
# through the EMS to the client / UI
# side!
log.cancel(
f'Rx unknown active order {txid}:\n'
f'{update_msg}\n'
'Cancelling order for now!..'
)
# call ws api to cancel:
# https://docs.kraken.com/websockets/#message-cancelOrder
await ws.send_msg({
@ -891,18 +957,6 @@ async def handle_order_updates(
})
continue
# remap statuses to ems set.
ems_status = {
'open': 'submitted',
'closed': 'filled',
'canceled': 'cancelled',
# do we even need to forward
# this state to the ems?
'pending': 'pending',
}[status]
# TODO: i like the open / closed semantics
# more we should consider them for internals
# send BrokerdStatus messages for all
# order state updates
resp = BrokerdStatus(
@ -912,7 +966,7 @@ async def handle_order_updates(
account=f'kraken.{acctid}',
# everyone doin camel case..
status=ems_status, # force lower case
status=status, # force lower case
filled=vlm,
reason='', # why held?

View File

@ -34,7 +34,6 @@ import pendulum
from trio_typing import TaskStatus
import tractor
import trio
import wsproto
from piker._cacheables import open_cached_client
from piker.brokers._util import (
@ -243,22 +242,6 @@ def normalize(
return topic, quote
def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]:
'''
Create a request subscription packet dict.
https://docs.kraken.com/websockets/#message-subscribe
'''
# eg. specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
return {
'pair': pairs,
'event': 'subscribe',
'subscription': data,
}
@acm
async def open_history_client(
symbol: str,
@ -381,15 +364,20 @@ async def stream_quotes(
}
@acm
async def subscribe(ws: wsproto.WSConnection):
async def subscribe(ws: NoBsWs):
# XXX: setup subs
# https://docs.kraken.com/websockets/#message-subscribe
# specific logic for this in kraken's shitty sync client:
# specific logic for this in kraken's sync client:
# https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188
ohlc_sub = make_sub(
list(ws_pairs.values()),
{'name': 'ohlc', 'interval': 1}
)
ohlc_sub = {
'event': 'subscribe',
'pair': list(ws_pairs.values()),
'subscription': {
'name': 'ohlc',
'interval': 1,
},
}
# TODO: we want to eventually allow unsubs which should
# be completely fine to request from a separate task
@ -398,10 +386,14 @@ async def stream_quotes(
await ws.send_msg(ohlc_sub)
# trade data (aka L1)
l1_sub = make_sub(
list(ws_pairs.values()),
{'name': 'spread'} # 'depth': 10}
)
l1_sub = {
'event': 'subscribe',
'pair': list(ws_pairs.values()),
'subscription': {
'name': 'spread',
# 'depth': 10}
},
}
# pull a first quote and deliver
await ws.send_msg(l1_sub)

View File

@ -83,7 +83,13 @@ class OrderBook:
"""Cancel an order (or alert) in the EMS.
"""
cmd = self._sent_orders[uuid]
cmd = self._sent_orders.get(uuid)
if not cmd:
log.error(
f'Unknown order {uuid}!?\n'
f'Maybe there is a stale entry or line?\n'
f'You should report this as a bug!'
)
msg = Cancel(
oid=uuid,
symbol=cmd.symbol,
@ -149,10 +155,17 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
sym = cmd.symbol
msg = pformat(cmd)
if sym == symbol_key:
log.info(f'Send order cmd:\n{msg}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)
else:
log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
f'\n{msg}'
)
@acm
@ -220,11 +233,19 @@ async def open_ems(
fqsn=fqsn,
exec_mode=mode,
) as (ctx, (positions, accounts)),
) as (
ctx,
(
positions,
accounts,
dialogs,
)
),
# open 2-way trade command stream
ctx.open_stream() as trades_stream,
):
# start sync code order msg delivery task
async with trio.open_nursery() as n:
n.start_soon(
relay_order_cmds_from_sync_code,
@ -232,4 +253,10 @@ async def open_ems(
trades_stream
)
yield book, trades_stream, positions, accounts
yield (
book,
trades_stream,
positions,
accounts,
dialogs,
)

View File

@ -18,8 +18,8 @@
In da suit parlances: "Execution management systems"
"""
from collections import defaultdict, ChainMap
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
@ -27,6 +27,7 @@ from typing import (
AsyncIterator,
Any,
Callable,
Optional,
)
from bidict import bidict
@ -41,9 +42,16 @@ from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdError, BrokerdPosition,
Order,
Status,
# Cancel,
BrokerdCancel,
BrokerdOrder,
# BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdError,
BrokerdPosition,
)
@ -90,8 +98,7 @@ def mk_check(
)
@dataclass
class _DarkBook:
class _DarkBook(Struct):
'''
EMS-trigger execution book.
@ -116,17 +123,24 @@ class _DarkBook:
dict, # cmd / msg type
]
]
] = field(default_factory=dict)
] = {}
# tracks most recent values per symbol each from data feed
lasts: dict[
str,
float,
] = field(default_factory=dict)
] = {}
# mapping of piker ems order ids to current brokerd order flow message
_ems_entries: dict[str, str] = field(default_factory=dict)
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict)
# _ems_entries: dict[str, str] = {}
_active: dict = {}
# mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
_ems2brokerd_ids: dict[str, str] = bidict()
# XXX: this is in place to prevent accidental positions that are too
@ -181,6 +195,7 @@ async def clear_dark_triggers(
for oid, (
pred,
tf,
# TODO: send this msg instead?
cmd,
percent_away,
abs_diff_away
@ -188,9 +203,9 @@ async def clear_dark_triggers(
tuple(execs.items())
):
if (
not pred or
ttype not in tf or
not pred(price)
not pred
or ttype not in tf
or not pred(price)
):
# log.runtime(
# f'skipping quote for {sym} '
@ -200,30 +215,29 @@ async def clear_dark_triggers(
# majority of iterations will be non-matches
continue
brokerd_msg: Optional[BrokerdOrder] = None
match cmd:
# alert: nothing to do but relay a status
# back to the requesting ems client
case {
'action': 'alert',
}:
resp = 'alert_triggered'
case Order(action='alert'):
resp = 'triggered'
# executable order submission
case {
'action': action,
'symbol': symbol,
'account': account,
'size': size,
}:
case Order(
action=action,
symbol=symbol,
account=account,
size=size,
):
bfqsn: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away
resp = 'dark_triggered' # hidden on client-side
resp = 'triggered' # hidden on client-side
log.info(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
live_req = BrokerdOrder(
brokerd_msg = BrokerdOrder(
action=action,
oid=oid,
account=account,
@ -232,26 +246,22 @@ async def clear_dark_triggers(
price=submit_price,
size=size,
)
await brokerd_orders_stream.send(live_req)
# mark this entry as having sent an order
# request. the entry will be replaced once the
# target broker replies back with
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
book._ems_entries[oid] = live_req
await brokerd_orders_stream.send(brokerd_msg)
# book._ems_entries[oid] = live_req
# book._msgflows[oid].maps.insert(0, live_req)
case _:
raise ValueError(f'Invalid dark book entry: {cmd}')
# fallthrough logic
resp = Status(
status = Status(
oid=oid, # ems dialog id
time_ns=time.time_ns(),
resp=resp,
trigger_price=price,
brokerd_msg=cmd,
req=cmd,
brokerd_msg=brokerd_msg,
)
# remove exec-condition from set
@ -262,9 +272,24 @@ async def clear_dark_triggers(
f'pred for {oid} was already removed!?'
)
# update actives
# mark this entry as having sent an order
# request. the entry will be replaced once the
# target broker replies back with
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
if cmd.action == 'alert':
# don't register the alert status (so it won't
# be reloaded by clients) since it's now
# complete / closed.
book._active.pop(oid)
else:
book._active[oid] = status
# send response to client-side
try:
await ems_client_order_stream.send(resp)
await ems_client_order_stream.send(status)
except (
trio.ClosedResourceError,
):
@ -281,8 +306,7 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
class TradesRelay(Struct):
# for now we keep only a single connection open with
# each ``brokerd`` for simplicity.
@ -318,7 +342,10 @@ class Router(Struct):
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
dialogues: dict[
str,
list[tractor.MsgStream]
] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
@ -341,11 +368,12 @@ class Router(Struct):
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
'''
relay = self.relays.get(feed.mod.name)
relay: TradesRelay = self.relays.get(feed.mod.name)
if (
relay is None
@ -381,6 +409,22 @@ class Router(Struct):
relay.consumers -= 1
async def client_broadcast(
self,
msg: dict,
) -> None:
for client_stream in self.clients.copy():
try:
await client_stream.send(msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
self.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
_router: Router = None
@ -452,7 +496,6 @@ async def open_brokerd_trades_dialogue(
async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
@ -502,14 +545,9 @@ async def open_brokerd_trades_dialogue(
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally:
# parent context must have been closed
@ -561,15 +599,14 @@ async def translate_and_relay_brokerd_events(
broker ems
'error' -> log it locally (for now)
'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled'
('status' | 'fill'} -> relayed through see ``Status`` msg type.
Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
book: _DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
@ -601,30 +638,16 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
await router.client_broadcast(pos_msg)
continue
# BrokerdOrderAck
# initial response to brokerd order request
case {
'name': 'ack',
'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id
} if (
entry := book._ems_entries.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
}:
# register the brokerd request id (that was generated
# / created internally by the broker backend) with our
# local ems order id for reverse lookup later.
@ -639,23 +662,23 @@ async def translate_and_relay_brokerd_events(
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
# - the order has previously been requested to be
# 1. the order has previously been requested to be
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
action = getattr(entry, 'action', None)
if action and action == 'cancel':
status_msg = book._active[oid]
req = status_msg.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id
entry.reqid = reqid
# and tell broker to cancel immediately
status_msg.reqid = reqid
await brokerd_trades_stream.send(req)
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# 2. the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
# TODO: should we relay this ack state?
status_msg.resp = 'pending'
# no msg to client necessary
continue
@ -666,11 +689,9 @@ async def translate_and_relay_brokerd_events(
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
'broker_details': details,
# 'reason': reason,
}:
} if status_msg := book._active.get(oid):
msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank?
# TODO: figure out how this will interact with EMS clients
@ -680,43 +701,61 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_stream = router.dialogues[oid]
status_msg.resp = 'error'
status_msg.brokerd_msg = msg
book._active[oid] = status_msg
await ems_client_order_stream.send(status_msg)
# BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
(oid := book._ems2brokerd_ids.inverse.get(reqid))
and status in (
'canceled',
'open',
'closed',
)
):
msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should
# they maybe even eventually be separate messages?
if status == 'cancelled':
log.info(f'Cancellation for {oid} is complete!')
# TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the
# msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
status_msg.resp = status
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not remaining:
# retrieve existing live flow
old_reqid = status_msg.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid}:{type(old_reqid)} ->'
f' {reqid}{type(reqid)}'
)
resp = 'broker_executed'
status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
await ems_client_order_stream.send(status_msg)
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
status_msg = book._active.pop(oid)
elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!')
status_msg = book._active.pop(oid)
else: # open
# relayed from backend but probably not handled so
# just log it
else:
log.info(f'{broker} filled {msg}')
else:
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
log.info(f'{broker} opened order {msg}')
# BrokerdFill
case {
@ -728,82 +767,112 @@ async def translate_and_relay_brokerd_events(
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
# unknown valid message case?
# case {
# 'name': name,
# 'symbol': sym,
# 'reqid': reqid, # brokerd generated order-request id
# # 'oid': oid, # ems order-dialog id
# 'broker_details': details,
ems_client_order_stream = router.dialogues[oid]
# } if (
# book._ems2brokerd_ids.inverse.get(reqid) is None
# ):
# # TODO: pretty sure we can drop this now?
# XXX: bleh, a fill can come after 'closed' from `ib`?
# only send a late fill event we haven't already closed
# out the dialog status locally.
status_msg = book._active.get(oid)
if status_msg:
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
# # XXX: paper clearing special cases
# # paper engine race case: ``Client.submit_limit()`` hasn't
# # returned yet and provided an output reqid to register
# # locally, so we need to retreive the oid that was already
# # packed at submission since we already know it ahead of
# # time
# paper = details.get('paper_info')
# ext = details.get('external')
# ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the
# brokerd backend.
case {
'name': 'status',
'resp': status,
'reqid': reqid, # brokerd generated order-request id
}:
if (
status != 'open'
):
# TODO: check for an oid we might know since it was
# registered from a previous order/status load?
log.error(
f'Unknown/transient status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
# if paper:
# # paperboi keeps the ems id up front
# oid = paper['oid']
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
else:
# existing open backend order which we broadcast to
# all currently connected clients.
log.info(
f'Relaying existing open order:\n {brokerd_msg}'
)
# elif ext:
# # may be an order msg specified as "external" to the
# # piker ems flow (i.e. generated by some other
# # external broker backend client (like tws for ib)
# log.error(f"External trade event {name}@{ext}")
# use backend request id as our ems id though this
# may end up with collisions?
status_msg = Status(**brokerd_msg)
order = Order(**status_msg.req)
assert order.price and order.size
status_msg.req = order
# else:
# # something is out of order, we don't have an oid for
# # this broker-side message.
# log.error(
# f'Unknown oid: {oid} for msg {name}:\n'
# f'{pformat(brokerd_msg)}\n'
# 'Unable to relay message to client side!?'
# )
assert status_msg.src # source tag?
oid = str(status_msg.reqid)
# continue
# attempt to avoid collisions
status_msg.reqid = oid
assert status_msg.resp == 'open'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
book._active[oid] = status_msg
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
await router.client_broadcast(status_msg)
# don't fall through
continue
# brokerd error
case {
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
# XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``.
case {
# XXX: sometimes there is a race with the backend (like
# `ib` where the pending stauts will be related before
# the ack, in which case we just ignore the faster
# pending msg and wait for our expected ack to arrive
# later (i.e. the first block below should enter).
'name': 'status',
'status': status,
'reqid': reqid,
}:
oid = book._ems2brokerd_ids.inverse.get(reqid)
msg = f'Unhandled broker status for dialog {reqid}:\n'
if oid:
status_msg = book._active[oid]
msg += (
f'last status msg: {pformat(status_msg)}\n\n'
f'this msg:{pformat(brokerd_msg)}\n'
)
log.warning(msg)
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
entry = book._ems_entries[oid]
assert entry.oid == oid
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# XXX: ugh sometimes we don't access it?
if status_msg:
del status_msg
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
@ -829,27 +898,36 @@ async def process_client_order_cmds(
async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}')
oid = cmd['oid']
# CAWT DAMN we need struct support!
oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
# any dark/live status which is current
status = dark_book._active.get(oid)
match cmd:
# existing live-broker order cancel
case {
'action': 'cancel',
'oid': oid,
} if live_entry:
reqid = live_entry.reqid
msg = BrokerdCancel(
} if (
(status := dark_book._active.get(oid))
and status.resp in ('open', 'pending')
):
reqid = status.reqid
order = status.req
to_brokerd_msg = BrokerdCancel(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
account=live_entry.account,
# account=live_entry.account,
account=order.account,
)
# NOTE: cancel response will be relayed back in messages
@ -859,39 +937,53 @@ async def process_client_order_cmds(
log.info(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg)
await brokerd_order_stream.send(to_brokerd_msg)
else:
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg
# dark_book._ems_entries[oid] = msg
# special case for now..
status.req = to_brokerd_msg
# dark trigger cancel
case {
'action': 'cancel',
'oid': oid,
} if not live_entry:
try:
} if (
status and status.resp == 'dark_open'
# or status and status.req
):
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
entry = dark_book.orders[symbol].pop(oid, None)
if entry:
(
pred,
tickfilter,
cmd,
percent_away,
abs_diff_away
) = entry
# tell client side that we've cancelled the
# dark-trigger order
await client_order_stream.send(
Status(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
)
)
status.resp = 'canceled'
status.req = cmd
await client_order_stream.send(status)
# de-register this client dialogue
router.dialogues.pop(oid)
dark_book._active.pop(oid)
except KeyError:
else:
log.exception(f'No dark order for {symbol}?')
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# live order submission
case {
'oid': oid,
@ -899,11 +991,9 @@ async def process_client_order_cmds(
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': 'live',
'exec_mode': ('live' | 'paper'),
}:
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# TODO: relay this order msg directly?
req = Order(**cmd)
broker = req.brokers[0]
@ -912,13 +1002,13 @@ async def process_client_order_cmds(
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None:
# sanity check on emsd id
assert live_entry.oid == oid
reqid = live_entry.reqid
if status is not None:
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}")
reqid = status.reqid
status.req = req
status.resp = 'pending'
msg = BrokerdOrder(
oid=oid, # no ib support for oids...
@ -935,6 +1025,18 @@ async def process_client_order_cmds(
account=req.account,
)
if status is None:
status = Status(
oid=oid,
reqid=reqid,
resp='pending',
time_ns=time.time_ns(),
brokerd_msg=msg,
req=req,
)
dark_book._active[oid] = status
# send request to backend
# XXX: the trades data broker response loop
# (``translate_and_relay_brokerd_events()`` above) will
@ -950,7 +1052,7 @@ async def process_client_order_cmds(
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
dark_book._ems_entries[oid] = msg
# dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission
case {
@ -966,9 +1068,11 @@ async def process_client_order_cmds(
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper')
exec_mode in ('dark',)
or action == 'alert'
):
req = Order(**cmd)
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
# condition should be based on the current first
@ -1015,23 +1119,25 @@ async def process_client_order_cmds(
)[oid] = (
pred,
tickfilter,
cmd,
req,
percent_away,
abs_diff_away
)
resp = 'dark_submitted'
resp = 'dark_open'
# alerts have special msgs to distinguish
if action == 'alert':
resp = 'alert_submitted'
# if action == 'alert':
# resp = 'open'
await client_order_stream.send(
Status(
status = Status(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
req=req,
src='dark',
)
)
dark_book._active[oid] = status
await client_order_stream.send(status)
@tractor.context
@ -1099,10 +1205,9 @@ async def _emsd_main(
):
# XXX: this should be initial price quote from target provider
first_quote = feed.first_quotes[fqsn]
book = _router.get_dark_book(broker)
book.lasts[fqsn] = first_quote['last']
first_quote: dict = feed.first_quotes[fqsn]
book: _DarkBook = _router.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']
# open a stream with the brokerd backend for order
# flow dialogue
@ -1129,12 +1234,25 @@ async def _emsd_main(
await ems_ctx.started((
relay.positions,
list(relay.accounts),
book._active,
))
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream:
# register the client side before startingn the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream)
n.start_soon(
translate_and_relay_brokerd_events,
broker,
brokerd_stream,
_router,
)
# trigger scan and exec loop
n.start_soon(
clear_dark_triggers,
@ -1149,7 +1267,6 @@ async def _emsd_main(
# start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
# main entrypoint, run here until cancelled.
await process_client_order_cmds(

View File

@ -18,24 +18,92 @@
Clearing sub-system message and protocols.
"""
from typing import Optional, Union
# from collections import (
# ChainMap,
# deque,
# )
from typing import (
Optional,
Literal,
)
from ..data._source import Symbol
from ..data.types import Struct
# TODO: a composite for tracking msg flow on 2-legged
# dialogs.
# class Dialog(ChainMap):
# '''
# Msg collection abstraction to easily track the state changes of
# a msg flow in one high level, query-able and immutable construct.
# The main use case is to query data from a (long-running)
# msg-transaction-sequence
# '''
# def update(
# self,
# msg,
# ) -> None:
# self.maps.insert(0, msg.to_dict())
# def flatten(self) -> dict:
# return dict(self)
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - schema evolution:
# https://jcristharif.com/msgspec/usage.html#schema-evolution
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd
# --------------
class Order(Struct):
# TODO: ideally we can combine these 2 fields into
# 1 and just use the size polarity to determine a buy/sell.
# i would like to see this become more like
# https://jcristharif.com/msgspec/usage.html#literal
# action: Literal[
# 'live',
# 'dark',
# 'alert',
# ]
action: Literal[
'buy',
'sell',
'alert',
]
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: Literal[
'dark',
'live',
# 'paper', no right?
]
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: str | Symbol
account: str # should we set a default as '' ?
price: float
size: float # -ve is "sell", +ve is "buy"
brokers: Optional[list[str]] = []
class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
'''
Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
'''
@ -44,32 +112,6 @@ class Cancel(Struct):
symbol: str
class Order(Struct):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: Union[str, Symbol]
account: str # should we set a default as '' ?
price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float
brokers: list[str]
# Assigned once initial ack is received
# ack_time_ns: Optional[int] = None
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
# --------------
# Client <- emsd
# --------------
@ -79,37 +121,39 @@ class Order(Struct):
class Status(Struct):
name: str = 'status'
oid: str # uuid4
time_ns: int
oid: str # uuid4 ems-order dialog id
# {
# 'dark_submitted',
# 'dark_cancelled',
# 'dark_triggered',
# 'broker_submitted',
# 'broker_cancelled',
# 'broker_executed',
# 'broker_filled',
# 'broker_errored',
# 'alert_submitted',
# 'alert_triggered',
# }
resp: str # "response", see above
# trigger info
trigger_price: Optional[float] = None
# price: float
# broker: Optional[str] = None
resp: Literal[
'pending', # acked by broker but not yet open
'open',
'dark_open', # dark/algo triggered order is open in ems clearing loop
'triggered', # above triggered order sent to brokerd, or an alert closed
'closed', # fully cleared all size/units
'fill', # partial execution
'canceled',
'error',
]
# this maps normally to the ``BrokerdOrder.reqid`` below, an id
# normally allocated internally by the backend broker routing system
broker_reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
# for relaying backend msg data "through" the ems layer
# the (last) source order/request msg if provided
# (eg. the Order/Cancel which causes this msg) and
# acts as a back-reference to the corresponding
# request message which was the source of this msg.
req: Optional[Order | Cancel] = None
# XXX: better design/name here?
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
src: Optional[str] = None
# for relaying a boxed brokerd-dialog-side msg data "through" the
# ems layer to clients.
brokerd_msg: dict = {}
@ -131,25 +175,28 @@ class BrokerdCancel(Struct):
# for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field
reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str
account: str
time_ns: int
# TODO: if we instead rely on a +ve/-ve size to determine
# the action we more or less don't need this field right?
action: str = '' # {buy, sell}
# "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows
# for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field
reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
symbol: str # symbol.<providername> ?
symbol: str # fqsn
price: float
size: float
@ -170,7 +217,7 @@ class BrokerdOrderAck(Struct):
name: str = 'ack'
# defined and provided by backend
reqid: Union[int, str]
reqid: int | str
# emsd id originally sent in matching request msg
oid: str
@ -180,30 +227,22 @@ class BrokerdOrderAck(Struct):
class BrokerdStatus(Struct):
name: str = 'status'
reqid: Union[int, str]
reqid: int | str
time_ns: int
status: Literal[
'open',
'canceled',
'fill',
'pending',
'error',
]
# XXX: should be best effort set for every update
account: str = ''
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# {
# 'submitted',
# 'cancelled',
# 'filled',
# }
status: str
account: str
filled: float = 0.0
reason: str = ''
remaining: float = 0.0
# XXX: better design/name here?
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
external: bool = False
# external: bool = False
# XXX: not required schema as of yet
broker_details: dict = {
@ -218,7 +257,7 @@ class BrokerdFill(Struct):
'''
name: str = 'fill'
reqid: Union[int, str]
reqid: int | str
time_ns: int
# order exeuction related
@ -248,7 +287,7 @@ class BrokerdError(Struct):
# if no brokerd order request was actually submitted (eg. we errored
# at the ``pikerd`` layer) then there will be ``reqid`` allocated.
reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
symbol: str
reason: str

View File

@ -33,10 +33,10 @@ from bidict import bidict
import pendulum
import trio
import tractor
from dataclasses import dataclass
from .. import data
from ..data._source import Symbol
from ..data.types import Struct
from ..pp import (
Position,
Transaction,
@ -45,16 +45,20 @@ from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
from ..log import get_logger
from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdPosition, BrokerdError
BrokerdCancel,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdPosition,
BrokerdError,
)
log = get_logger(__name__)
@dataclass
class PaperBoi:
class PaperBoi(Struct):
"""
Emulates a broker order client providing the same API and
delivering an order-event response stream but with methods for
@ -68,8 +72,8 @@ class PaperBoi:
# map of paper "live" orders which be used
# to simulate fills based on paper engine settings
_buys: bidict
_sells: bidict
_buys: dict
_sells: dict
_reqids: bidict
_positions: dict[str, Position]
_trade_ledger: dict[str, Any]
@ -94,6 +98,10 @@ class PaperBoi:
'''
is_modify: bool = False
if action == 'alert':
# bypass all fill simulation
return reqid
entry = self._reqids.get(reqid)
if entry:
# order is already existing, this is a modify
@ -104,10 +112,6 @@ class PaperBoi:
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
if action == 'alert':
# bypass all fill simulation
return reqid
# TODO: net latency model
# we checkpoint here quickly particulalry
# for dark orders since we want the dark_executed
@ -119,7 +123,9 @@ class PaperBoi:
size = -size
msg = BrokerdStatus(
status='submitted',
status='open',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid,
time_ns=time.time_ns(),
filled=0.0,
@ -136,7 +142,14 @@ class PaperBoi:
) or (
action == 'sell' and (clear_price := self.last_bid[0]) >= price
):
await self.fake_fill(symbol, clear_price, size, action, reqid, oid)
await self.fake_fill(
symbol,
clear_price,
size,
action,
reqid,
oid,
)
else:
# register this submissions as a paper live order
@ -178,7 +191,9 @@ class PaperBoi:
await trio.sleep(0.05)
msg = BrokerdStatus(
status='cancelled',
status='canceled',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid,
time_ns=time.time_ns(),
broker_details={'name': 'paperboi'},
@ -230,25 +245,14 @@ class PaperBoi:
self._trade_ledger.update(fill_msg.to_dict())
if order_complete:
msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
status='filled',
# account=f'paper_{self.broker}',
account='paper',
status='closed',
filled=size,
remaining=0 if order_complete else remaining,
broker_details={
'paper_info': {
'oid': oid,
},
'action': action,
'size': size,
'price': price,
'name': self.broker,
},
)
await self.ems_trades_stream.send(msg)
@ -257,7 +261,10 @@ class PaperBoi:
pp = self._positions.setdefault(
token,
Position(
Symbol(key=symbol),
Symbol(
key=symbol,
broker_info={self.broker: {}},
),
size=size,
ppu=price,
bsuid=symbol,
@ -390,56 +397,41 @@ async def handle_order_requests(
) -> None:
# order_request: dict
request_msg: dict
async for request_msg in ems_order_stream:
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
match request_msg:
case {'action': ('buy' | 'sell')}:
order = BrokerdOrder(**request_msg)
account = order.account
if account != 'paper':
log.error(
'This is a paper account,'
' only a `paper` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
oid=order.oid,
symbol=order.symbol,
reason=f'Paper only. No account found: `{account}` ?',
))
continue
# validate
order = BrokerdOrder(**request_msg)
if order.reqid is None:
reqid = str(uuid.uuid4())
else:
reqid = order.reqid
reqid = order.reqid or str(uuid.uuid4())
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
# ems order request id
oid=order.oid,
# broker specific request id
reqid=reqid,
)
)
# call our client api to submit the order
reqid = await client.submit_limit(
oid=order.oid,
symbol=order.symbol,
price=order.price,
action=order.action,
size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
@ -447,17 +439,35 @@ async def handle_order_requests(
reqid=reqid,
)
elif action == 'cancel':
# elif action == 'cancel':
case {'action': 'cancel'}:
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
case _:
log.error(f'Unknown order command: {request_msg}')
_reqids: bidict[str, tuple] = {}
_buys: dict[
str,
dict[
tuple[str, float],
tuple[float, str, str],
]
] = {}
_sells: dict[
str,
dict[
tuple[str, float],
tuple[float, str, str],
]
] = {}
_positions: dict[str, Position] = {}
@tractor.context
async def trades_dialogue(
@ -467,6 +477,7 @@ async def trades_dialogue(
loglevel: str = None,
) -> None:
tractor.log.get_console_log(loglevel)
async with (
@ -476,10 +487,22 @@ async def trades_dialogue(
) as feed,
):
pp_msgs: list[BrokerdPosition] = []
pos: Position
token: str # f'{symbol}.{self.broker}'
for token, pos in _positions.items():
pp_msgs.append(BrokerdPosition(
broker=broker,
account='paper',
symbol=pos.symbol.front_fqsn(),
size=pos.size,
avg_price=pos.ppu,
))
# TODO: load paper positions per broker from .toml config file
# and pass as symbol to position data mapping: ``dict[str, dict]``
# await ctx.started(all_positions)
await ctx.started(({}, ['paper']))
await ctx.started((pp_msgs, ['paper']))
async with (
ctx.open_stream() as ems_stream,
@ -488,13 +511,13 @@ async def trades_dialogue(
client = PaperBoi(
broker,
ems_stream,
_buys={},
_sells={},
_buys=_buys,
_sells=_sells,
_reqids={},
_reqids=_reqids,
# TODO: load paper positions from ``positions.toml``
_positions={},
_positions=_positions,
# TODO: load postions from ledger file
_trade_ledger={},

View File

@ -18,6 +18,7 @@
Built-in (extension) types.
"""
import sys
from typing import Optional
from pprint import pformat
@ -42,8 +43,16 @@ class Struct(
}
def __repr__(self):
# only turn on pprint when we detect a python REPL
# at runtime B)
if (
hasattr(sys, 'ps1')
# TODO: check if we're in pdb
):
return f'Struct({pformat(self.to_dict())})'
return super().__repr__()
def copy(
self,
update: Optional[dict] = None,

View File

@ -140,9 +140,9 @@ class LineEditor:
) -> LevelLine:
staged_line = self._active_staged_line
if not staged_line:
raise RuntimeError("No line is currently staged!?")
# staged_line = self._active_staged_line
# if not staged_line:
# raise RuntimeError("No line is currently staged!?")
# for now, until submission reponse arrives
line.hide_labels()

View File

@ -221,6 +221,7 @@ async def handle_viewmode_kb_inputs(
# TODO: show pp config mini-params in status bar widget
# mode.pp_config.show()
trigger_type: str = 'dark'
if (
# 's' for "submit" to activate "live" order
Qt.Key_S in pressed or
@ -228,9 +229,6 @@ async def handle_viewmode_kb_inputs(
):
trigger_type: str = 'live'
else:
trigger_type: str = 'dark'
# order mode trigger "actions"
if Qt.Key_D in pressed: # for "damp eet"
action = 'sell'
@ -397,8 +395,11 @@ class ChartView(ViewBox):
'''
if self._ic is None:
try:
self.chart.pause_all_feeds()
self._ic = trio.Event()
except RuntimeError:
pass
def signal_ic(
self,
@ -411,9 +412,12 @@ class ChartView(ViewBox):
'''
if self._ic:
try:
self._ic.set()
self._ic = None
self.chart.resume_all_feeds()
except RuntimeError:
pass
@asynccontextmanager
async def open_async_input_handler(
@ -669,7 +673,10 @@ class ChartView(ViewBox):
# XXX: WHY
ev.accept()
try:
self.start_ic()
except RuntimeError:
pass
# if self._ic is None:
# self.chart.pause_all_feeds()
# self._ic = trio.Event()

View File

@ -421,6 +421,10 @@ class LevelLine(pg.InfiniteLine):
return path
@property
def marker(self) -> LevelMarker:
return self._marker
def hoverEvent(self, ev):
'''
Mouse hover callback.

View File

@ -49,16 +49,21 @@ from ._position import (
SettingsPane,
)
from ._forms import FieldsForm
# from ._label import FormatLabel
from ._window import MultiStatus
from ..clearing._messages import Order, BrokerdPosition
from ..clearing._messages import (
Order,
Status,
# BrokerdOrder,
# BrokerdStatus,
BrokerdPosition,
)
from ._forms import open_form_input_handling
log = get_logger(__name__)
class OrderDialog(Struct):
class Dialog(Struct):
'''
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart.
@ -74,38 +79,6 @@ class OrderDialog(Struct):
fills: Dict[str, Any] = {}
def on_level_change_update_next_order_info(
level: float,
# these are all ``partial``-ed in at callback assignment time.
line: LevelLine,
order: Order,
tracker: PositionTracker,
) -> None:
'''
A callback applied for each level change to the line
which will recompute the order size based on allocator
settings. this is assigned inside
``OrderMode.line_from_order()``
'''
# NOTE: the ``Order.account`` is set at order stage time
# inside ``OrderMode.line_from_order()``.
order_info = tracker.alloc.next_order_info(
startup_pp=tracker.startup_pp,
live_pp=tracker.live_pp,
price=level,
action=order.action,
)
line.update_labels(order_info)
# update bound-in staged order
order.price = level
order.size = order_info['size']
@dataclass
class OrderMode:
'''
@ -141,7 +114,7 @@ class OrderMode:
current_pp: Optional[PositionTracker] = None
active: bool = False
name: str = 'order'
dialogs: dict[str, OrderDialog] = field(default_factory=dict)
dialogs: dict[str, Dialog] = field(default_factory=dict)
_colors = {
'alert': 'alert_yellow',
@ -150,12 +123,45 @@ class OrderMode:
}
_staged_order: Optional[Order] = None
def on_level_change_update_next_order_info(
self,
level: float,
# these are all ``partial``-ed in at callback assignment time.
line: LevelLine,
order: Order,
tracker: PositionTracker,
) -> None:
'''
A callback applied for each level change to the line
which will recompute the order size based on allocator
settings. this is assigned inside
``OrderMode.line_from_order()``
'''
# NOTE: the ``Order.account`` is set at order stage time inside
# ``OrderMode.line_from_order()`` or is inside ``Order`` msg
# field for loaded orders.
order_info = tracker.alloc.next_order_info(
startup_pp=tracker.startup_pp,
live_pp=tracker.live_pp,
price=level,
action=order.action,
)
line.update_labels(order_info)
# update bound-in staged order
order.price = level
order.size = order_info['size']
# when an order is changed we flip the settings side-pane to
# reflect the corresponding account and pos info.
self.pane.on_ui_settings_change('account', order.account)
def line_from_order(
self,
order: Order,
symbol: Symbol,
**line_kwargs,
) -> LevelLine:
@ -173,8 +179,8 @@ class OrderMode:
color=self._colors[order.action],
dotted=True if (
order.exec_mode == 'dark' and
order.action != 'alert'
order.exec_mode == 'dark'
and order.action != 'alert'
) else False,
**line_kwargs,
@ -184,10 +190,12 @@ class OrderMode:
# immediately
if order.action != 'alert':
line._on_level_change = partial(
on_level_change_update_next_order_info,
self.on_level_change_update_next_order_info,
line=line,
order=order,
tracker=self.current_pp,
# use the corresponding position tracker for the
# order's account.
tracker=self.trackers[order.account],
)
else:
@ -236,8 +244,6 @@ class OrderMode:
line = self.line_from_order(
order,
symbol,
show_markers=True,
# just for the stage line to avoid
# flickering while moving the cursor
@ -249,7 +255,6 @@ class OrderMode:
# prevent flickering of marker while moving/tracking cursor
only_show_markers_on_hover=False,
)
line = self.lines.stage_line(line)
# hide crosshair y-line and label
@ -262,25 +267,26 @@ class OrderMode:
def submit_order(
self,
send_msg: bool = True,
order: Optional[Order] = None,
) -> OrderDialog:
) -> Dialog:
'''
Send execution order to EMS return a level line to
represent the order on a chart.
'''
if not order:
staged = self._staged_order
symbol: Symbol = staged.symbol
# apply order fields for ems
oid = str(uuid.uuid4())
# format order data for ems
order = staged.copy()
order.oid = oid
order.symbol = symbol.front_fqsn()
order.symbol = order.symbol.front_fqsn()
line = self.line_from_order(
order,
symbol,
show_markers=True,
only_show_markers_on_hover=True,
@ -298,17 +304,17 @@ class OrderMode:
# color once the submission ack arrives.
self.lines.submit_line(
line=line,
uuid=oid,
uuid=order.oid,
)
dialog = OrderDialog(
uuid=oid,
dialog = Dialog(
uuid=order.oid,
order=order,
symbol=symbol,
symbol=order.symbol,
line=line,
last_status_close=self.multistatus.open_status(
f'submitting {self._trigger_type}-{order.action}',
final_msg=f'submitted {self._trigger_type}-{order.action}',
f'submitting {order.exec_mode}-{order.action}',
final_msg=f'submitted {order.exec_mode}-{order.action}',
clear_on_next=True,
)
)
@ -318,14 +324,21 @@ class OrderMode:
# enter submission which will be popped once a response
# from the EMS is received to move the order to a different# status
self.dialogs[oid] = dialog
self.dialogs[order.oid] = dialog
# hook up mouse drag handlers
line._on_drag_start = self.order_line_modify_start
line._on_drag_end = self.order_line_modify_complete
# send order cmd to ems
if send_msg:
self.book.send(order)
else:
# just register for control over this order
# TODO: some kind of mini-perms system here based on
# an out-of-band tagging/auth sub-sys for multiplayer
# order control?
self.book._sent_orders[order.oid] = order
return dialog
@ -363,7 +376,7 @@ class OrderMode:
self,
uuid: str
) -> OrderDialog:
) -> Dialog:
'''
Order submitted status event handler.
@ -418,7 +431,7 @@ class OrderMode:
self,
uuid: str,
msg: Dict[str, Any],
msg: Status,
) -> None:
@ -442,7 +455,7 @@ class OrderMode:
# TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way?
f'{msg["resp"]}: {msg["trigger_price"]}',
f'{msg.resp}: {msg.req.price}',
],
)
log.runtime(result)
@ -502,7 +515,7 @@ class OrderMode:
oid = dialog.uuid
cancel_status_close = self.multistatus.open_status(
f'cancelling order {oid[:6]}',
f'cancelling order {oid}',
group_key=key,
)
dialog.last_status_close = cancel_status_close
@ -512,6 +525,44 @@ class OrderMode:
return ids
def load_unknown_dialog_from_msg(
self,
msg: Status,
) -> Dialog:
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
order = msg.req
oid = str(msg.oid)
symbol = order.symbol
# TODO: MEGA UGGG ZONEEEE!
src = msg.src
if (
src
and src not in ('dark', 'paperboi')
and src not in symbol
):
fqsn = symbol + '.' + src
brokername = src
else:
fqsn = symbol
*head, brokername = fqsn.rsplit('.')
# fill out complex fields
order.oid = str(order.oid)
order.brokers = [brokername]
order.symbol = Symbol.from_fqsn(
fqsn=fqsn,
info={},
)
dialog = self.submit_order(
send_msg=False,
order=order,
)
assert self.dialogs[oid] == dialog
return dialog
@asynccontextmanager
async def open_order_mode(
@ -549,6 +600,7 @@ async def open_order_mode(
trades_stream,
position_msgs,
brokerd_accounts,
ems_dialog_msgs,
),
trio.open_nursery() as tn,
@ -596,10 +648,10 @@ async def open_order_mode(
sym = msg['symbol']
if (
sym == symkey or
# mega-UGH, i think we need to fix the FQSN stuff sooner
# then later..
sym == symkey.removesuffix(f'.{broker}')
(sym == symkey) or (
# mega-UGH, i think we need to fix the FQSN
# stuff sooner then later..
sym == symkey.removesuffix(f'.{broker}'))
):
pps_by_account[acctid] = msg
@ -703,7 +755,7 @@ async def open_order_mode(
order_pane.order_mode = mode
# select a pp to track
tracker = trackers[pp_account]
tracker: PositionTracker = trackers[pp_account]
mode.current_pp = tracker
tracker.show()
tracker.hide_info()
@ -755,38 +807,61 @@ async def open_order_mode(
# to handle input since the ems connection is ready
started.set()
for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg
await process_trade_msg(
mode,
book,
msg,
)
tn.start_soon(
process_trades_and_update_ui,
tn,
feed,
mode,
trades_stream,
mode,
book,
)
yield mode
async def process_trades_and_update_ui(
n: trio.Nursery,
feed: Feed,
mode: OrderMode,
trades_stream: tractor.MsgStream,
mode: OrderMode,
book: OrderBook,
) -> None:
get_index = mode.chart.get_index
global _pnl_tasks
# this is where we receive **back** messages
# about executions **from** the EMS actor
async for msg in trades_stream:
await process_trade_msg(
mode,
book,
msg,
)
async def process_trade_msg(
mode: OrderMode,
book: OrderBook,
msg: dict,
) -> tuple[Dialog, Status]:
get_index = mode.chart.get_index
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
log.debug(f'Received order msg:\n{fmsg}')
name = msg['name']
if name in (
'position',
):
@ -811,95 +886,107 @@ async def process_trades_and_update_ui(
# short circuit to next msg to avoid
# unnecessary msg content lookups
continue
return
resp = msg['resp']
oid = msg['oid']
msg = Status(**msg)
resp = msg.resp
oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
dialog = mode.dialogs.get(oid)
if dialog is None:
log.warning(f'received msg for untracked dialog:\n{fmsg}')
# TODO: enable pure tracking / mirroring of dialogs
# is desired.
continue
# record message to dialog tracking
dialog.msgs[oid] = msg
# response to 'action' request (buy/sell)
if resp in (
'dark_submitted',
'broker_submitted'
):
match msg:
case Status(resp='dark_open' | 'open'):
if dialog is not None:
# show line label once order is live
mode.on_submit(oid)
# resp to 'cancel' request or error condition
# for action request
elif resp in (
'broker_inactive',
'broker_errored',
else:
log.warning(
f'received msg for untracked dialog:\n{fmsg}'
)
assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}'
sym = mode.chart.linked.symbol
fqsn = sym.front_fqsn()
order = Order(**msg.req)
if (
((order.symbol + f'.{msg.src}') == fqsn)
# a existing dark order for the same symbol
or (
order.symbol == fqsn
and (
msg.src in ('dark', 'paperboi')
or (msg.src in fqsn)
)
)
):
msg.req = order
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
# return dialog, msg
case Status(resp='error'):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
broker_msg = msg.brokerd_msg
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (
'broker_cancelled',
'dark_cancelled'
):
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.cancel(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
req = Order(**msg.req)
log.cancel(f'Canceled {req.action}:{oid}')
elif resp in (
'dark_triggered'
case Status(
resp='triggered',
# req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'},
):
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmsg}')
elif resp in (
'alert_triggered'
case Status(
resp='triggered',
# req=Order(exec_mode='live', action='alert') as req, # TODO
req={'exec_mode': 'live', 'action': 'alert'} as req,
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
req = Order(**req)
mode.on_fill(
oid,
price=msg['trigger_price'],
price=req.price,
arrow_index=get_index(time.time()),
)
mode.lines.remove_line(uuid=oid)
msg.req = req
await mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell
elif resp in (
'broker_executed',
# response to completed 'dialog' for order request
case Status(
resp='closed',
# req=Order() as req, # TODO
req=req,
):
# right now this is just triggering a system alert
msg.req = Order(**req)
await mode.on_exec(oid, msg)
if msg['brokerd_msg']['remaining'] == 0:
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
elif resp in (
'broker_filled',
):
case Status(resp='fill'):
# handle out-of-piker fills reporting?
known_order = book._sent_orders.get(oid)
if not known_order:
log.warning(f'order {oid} is unknown')
continue
return
action = known_order.action
details = msg['brokerd_msg']
details = msg.brokerd_msg
# TODO: some kinda progress system
mode.on_fill(
@ -914,3 +1001,9 @@ async def process_trades_and_update_ui(
# TODO: how should we look this up?
# tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg)
# record message to dialog tracking
if dialog:
dialog.msgs[oid] = msg
return dialog, msg