Compare commits

...

21 Commits

Author SHA1 Message Date
Tyler Goodlet 2f6e3ad03f Add dict differ helpers from SO answer 2022-08-11 16:18:05 -04:00
Tyler Goodlet b75683879a Only pprint our struct when we detect a py REPL 2022-08-11 15:56:28 -04:00
Tyler Goodlet db8a3dd1b7 Move fill case-block earlier, log broker errors 2022-08-11 14:26:34 -04:00
Tyler Goodlet 2d92ed2052 Drop `msgpack` from `marketstore` module 2022-08-11 14:21:01 -04:00
Tyler Goodlet 0756cb0289 Load boxed `.req` values as `Order`s in mode loop 2022-08-11 14:20:23 -04:00
Tyler Goodlet 66f7dd9020 'Only send `'closed'` on Filled events, lowercase all statues' 2022-08-11 14:18:53 -04:00
Tyler Goodlet 9782107153 First try mega-basic stock (reverse) split support with `ib` and `pps.toml` 2022-08-10 18:19:44 -04:00
Tyler Goodlet 1f43f660fe Passthrough filled and pendingsubmit cases 2022-08-10 18:03:25 -04:00
Tyler Goodlet d3b7d0e247 Include both symbols in error msg when a mismatch 2022-08-10 17:59:27 -04:00
Tyler Goodlet 700dbf0e2b Handle 'closed' vs. 'fill` race case..
`ib` is super good not being reliable with order event sequence order
and duplication of fill info. This adds some guards to try and avoid
popping the last status status too early if we end up receiving
a `'closed'` before the expected `'fill`' event(s). Further delete the
`status_msg` ref on each iteration to avoid stale reference lookups in
the relay task/loop.
2022-08-10 17:17:47 -04:00
Tyler Goodlet b52c4092f3 Use modern `Union` pipe op syntax for msg fields 2022-08-10 16:41:00 -04:00
Tyler Goodlet 7fe3e3f482 Add full EMS order-dialog (re-)load support!
This includes darks, lives and alerts with all connecting clients
being broadcast all existing order-flow dialog states. Obviously
for now darks and alerts only live as long as the `emsd` actor lifetime
(though we will store these in local state eventually) and "live" orders
have lifetimes managed by their respective backend broker.

The details of this change-set is extensive, so here we go..

Messaging schema:
- change the messaging `Status` status-key set to:
  `resp: Literal['pending', 'open', 'dark_open', 'triggered',
                'closed',  'fill', 'canceled', 'error']`

  which better reflects the semantics of order lifetimes and was
  partially inspired by the status keys `kraken` provides for their
  order-entry API. The prior key set was based on `ib`'s horrible
  semantics which sound like they're right out of the 80s..
  Also, we reflect this same set in the `BrokerdStatus` msg and likely
  we'll just get rid of the separate brokerd-dialog side type
  eventually.
- use `Literal` type annots for statuses where applicable and as they
  are supported by `msgspec`.
- add additional optional `Status` fields:
  -`req: Order` to allow each status msg to optionally ref its
    commanding order-request msg allowing at least a request-response
    style implicit tracing in all response msgs.
  -`src: str` tag string to show the source of the msg.
  -`reqid: str | int` such that the ems can relay the `brokerd`
    request id both to the client side and have one spot to look
    up prior status msgs and
- draft a (unused/commented) `Dialog` type which can be eventually used
  at all EMS endpoints to track msg-flow states

EMS engine adjustments/rework:
- use the new status key set throughout and expect `BrokerdStatus` msgs
  to use the same new schema as `Status`.
- add a `_DarkBook._active: dict[str, Status]` table which is now used for
  all per-leg-dialog associations and order flow state tracking
  allowing for the both the brokerd-relay and client-request handler loops
  to read/write the same msg-table and provides for delivering
  the overall EMS-active-orders state to newly/re-connecting clients
  with minimal processing; this table replaces what the `._ems_entries`
  table from prior.
- add `Router.client_broadcast()` to send a msg to all currently
  connected peers.
- a variety of msg handler block logic tweaks including more `case:`
  blocks to be both flatter and improve explicitness:
  - for the relay loop move all `Status` msg update and sending to
    within each block instead of a fallthrough case plus hard-to-follow
    state logic.
  - add a specific case for unhandled backend status keys and just log
    them.
  - pop alerts from `._active` immediately once triggered.
  - where possible mutate status msgs fields over instantiating new
    ones.
- insert and expect `Order` instances in the dark clearing loop and
  adjust `case:` blocks accordingly.
- tag `dark_open` and `triggered` statuses as sourced from the ems.
- drop all the `ChainMap` stuff for now; we're going to make our own
  `Dialog` type for this purpose..

Order mode rework:
- always parse the `Status` msg and use match syntax cases with object
  patterns, hackily assign the `.req` in many blocks to work around not
  yet having proper on-the-wire decoding yet.
- make `.load_unknown_dialog_from_msg()` expect a `Status` with boxed
  `.req: Order` as input.
- change `OrderDialog` -> `Dialog` in prep for a general purpose type
  of the same name.

`ib` backend order loading support:
- do "closed" status detection inside the msg-relay loop instead
  of expecting the ems to do this..
- add an attempt to cancel inactive orders by scheduling cancel
  submissions continually (no idea if this works).
- add a status map to go from the 80s keys to our new set.
- deliver `Status` msgs with an embedded `Order` for existing live order
  loading and make sure to try an get the source exchange info (instead
  of SMART).

Paper engine ported to match:
- use new status keys in `BrokerdStatus` msgs
- use `match:` syntax in request handler loop
2022-08-10 13:38:23 -04:00
Tyler Goodlet bbbdcad33b WIP playing with a `ChainMap` of messages 2022-08-08 13:47:41 -04:00
Tyler Goodlet a3812cd169 Fix for TWS created position loading 2022-08-08 13:47:17 -04:00
Tyler Goodlet 5ac5743c66 Deliver existing dialog (msgs) to every EMS client
Ideally every client that connects to the ems can know its state
(immediately) meaning relay all the order dialogs that are currently
active. This adds full (hacky WIP) support to receive those dialog
(msgs) from the `open_ems()` startup values via the `.started()` msg
from `_emsd_main()`.

Further this adds support to the order mode chart-UI to display existing
(live) orders on the chart during startup. Details include,

- add a `OrderMode.load_unknown_dialog_from_msg()` for processing and
  displaying a ``BrokerdStatus`` (for now) msg from the EMS that was not
  previously created by the current ems client and registering and
  displaying it on the chart.
- break out the ems msg processing into a new
  `order_mode.process_trade_msg()` func so that it can be called on the
  startup dialog-msg set as well as eventually used a more general low
  level auto-strat API (eg. when we get to displaying auto-strat and
  group trading automatically on an observing chart UI.
- hackyness around msg-processing for the dialogs delivery since we're
  technically delivering `BrokerdStatus` msgs when the client-side
  processing technically expects `Status` msgs.. we'll rectify this
  soon!
2022-08-05 21:04:31 -04:00
Tyler Goodlet aa204228ab Lol, handle failed-to-cancel statuses.. 2022-08-05 21:04:31 -04:00
Tyler Goodlet 0bd8f2bcd9 Start brokerd relay loop after opening client stream
In order to avoid missed existing order message emissions on startup we
need to be sure the client side stream is registered with the router
first. So break out the starting of the
`translate_and_relay_brokerd_events()` task until inside the client
stream block and start the task using the dark clearing loop nursery.

Also, ensure `oid` (and thus for `ib` the equivalent re-used `reqid`)
are cast to `str` before registering the dark book. Deliver the dark
book entries as part of the `_emsd_main()` context `.started()` values.
2022-08-05 21:04:31 -04:00
Tyler Goodlet 334f512ad3 Always cast ems `requid` values to `int` 2022-08-05 21:04:31 -04:00
Tyler Goodlet 71cca4ceda Drop staged line runtime guard 2022-08-05 21:04:31 -04:00
Tyler Goodlet 0d332427e2 First draft: relay open orders through ems and display on chart 2022-08-05 21:04:31 -04:00
Tyler Goodlet 02980282cd Relay existing open orders from ib on startup 2022-08-05 21:04:31 -04:00
11 changed files with 1016 additions and 526 deletions

View File

@ -36,8 +36,6 @@ from trio_typing import TaskStatus
import tractor
from ib_insync.contract import (
Contract,
# Option,
# Forex,
)
from ib_insync.order import (
Trade,
@ -61,6 +59,8 @@ from piker.pp import (
)
from piker.log import get_console_log
from piker.clearing._messages import (
Order,
Status,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
@ -123,11 +123,13 @@ async def handle_order_requests(
f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.'
)
await ems_order_stream.send(BrokerdError(
await ems_order_stream.send(
BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?',
))
)
)
continue
client = _accounts2clients.get(account)
@ -147,6 +149,14 @@ async def handle_order_requests(
# validate
order = BrokerdOrder(**request_msg)
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid = order.reqid
if reqid is not None:
reqid = int(reqid)
# call our client api to submit the order
reqid = client.submit_limit(
oid=order.oid,
@ -155,12 +165,7 @@ async def handle_order_requests(
action=order.action,
size=order.size,
account=acct_number,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
reqid=reqid,
)
if reqid is None:
await ems_order_stream.send(BrokerdError(
@ -180,9 +185,9 @@ async def handle_order_requests(
)
)
elif action == 'cancel':
if action == 'cancel':
msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=msg.reqid)
client.submit_cancel(reqid=int(msg.reqid))
else:
log.error(f'Unknown order command: {request_msg}')
@ -357,11 +362,24 @@ async def update_and_audit_msgs(
# presume we're at least not more in the shit then we
# thought.
if diff:
reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio
if split_ratio >= reverse_split_ratio:
entry = f'split_ratio = {int(split_ratio)}'
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n'
f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
f'reverse_split_ratio: {reverse_split_ratio}\n'
f'split_ratio: {split_ratio}\n\n'
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
'If you are expecting a (reverse) split in this '
'instrument you should probably put the following '
f'in the `pps.toml` section:\n{entry}'
)
msg.size = ibsize
@ -439,7 +457,6 @@ async def trades_dialogue(
# we might also want to delegate a specific actor for
# ledger writing / reading for speed?
async with (
# trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
# Open a trade ledgers stack for appending trade records over
@ -468,6 +485,52 @@ async def trades_dialogue(
client = aioclients[account]
trades: list[Trade] = client.ib.openTrades()
order_msgs = []
for trade in trades:
order = trade.order
quant = trade.order.totalQuantity
action = order.action.lower()
size = {
'sell': -1,
'buy': 1,
}[action] * quant
con = trade.contract
# TODO: in the case of the SMART venue (aka ib's
# router-clearing sys) we probably should handle
# showing such orders overtop of the fqsn for the
# primary exchange, how to map this easily is going
# to be a bit tricky though?
deats = await proxy.con_deats(contracts=[con])
fqsn = list(deats)[0]
reqid = order.orderId
# TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client
# side in the order mode loop?
msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=str(reqid),
reqid=reqid,
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=str(reqid),
symbol=fqsn,
account=accounts_def.inverse[order.account],
price=order.lmtPrice,
size=size,
),
src='ib',
)
order_msgs.append(msg)
# process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in
@ -480,6 +543,7 @@ async def trades_dialogue(
# sure know which positions to update from the ledger if
# any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg
@ -493,9 +557,7 @@ async def trades_dialogue(
or pp.size != msg.size
):
trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans)
pp = updated[bsuid]
table.update_from_trans(trans)
# update trades ledgers for all accounts from connected
# api clients which report trades for **this session**.
trades = await proxy.trades()
@ -521,9 +583,28 @@ async def trades_dialogue(
trans = trans_by_acct.get(acctid)
if trans:
table.update_from_trans(trans)
updated = table.update_from_trans(trans)
assert msg.size == pp.size, 'WTF'
# XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bsuid]`` ?
pp = table.pps.get(bsuid)
if not pp:
log.error(
f'The contract id for {msg} may have '
f'changed to {bsuid}\nYou may need to '
'adjust your ledger for this, skipping '
'for now.'
)
continue
if msg.size != pp.size:
log.error(
'Position mismatch {pp.symbol.front_fqsn()}:\n'
f'ib: {msg.size}\n'
f'piker: {pp.size}\n'
)
active_pps, closed_pps = table.dump_active()
@ -575,6 +656,10 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream,
trio.open_nursery() as n,
):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
trade_event_stream = await n.start(open_trade_event_stream)
clients.append((client, trade_event_stream))
@ -586,6 +671,7 @@ async def trades_dialogue(
for client, stream in clients:
n.start_soon(
deliver_trade_events,
n,
stream,
ems_stream,
accounts_def,
@ -661,8 +747,24 @@ async def emit_pp_update(
await ems_stream.send(msg)
_statuses: dict[str, str] = {
'cancelled': 'canceled',
'submitted': 'open',
# XXX: just pass these through? it duplicates actual fill events other
# then the case where you the `.remaining == 0` case which is our
# 'closed'` case.
# 'filled': 'pending',
# 'pendingsubmit': 'pending',
# TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363
'inactive': 'pending',
}
async def deliver_trade_events(
nurse: trio.Nursery,
trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
@ -718,6 +820,45 @@ async def deliver_trade_events(
# unwrap needed data from ib_insync internal types
trade: Trade = item
status: OrderStatus = trade.orderStatus
ib_status_key = status.status.lower()
acctid = accounts_def.inverse[trade.order.account]
# double check there is no error when
# cancelling.. gawwwd
if ib_status_key == 'cancelled':
last_log = trade.log[-1]
if (
last_log.message
and 'Error' not in last_log.message
):
ib_status_key = trade.log[-2].status
print(ib_status_key)
elif ib_status_key == 'inactive':
async def sched_cancel():
log.warning(
'OH GAWD an inactive order..scheduling a cancel\n'
f'{pformat(item)}'
)
proxy = proxies[acctid]
await proxy.submit_cancel(reqid=trade.order.orderId)
await trio.sleep(1)
nurse.start_soon(sched_cancel)
nurse.start_soon(sched_cancel)
status_key = (
_statuses.get(ib_status_key)
or ib_status_key.lower()
)
remaining = status.remaining
if (
status_key == 'filled'
and remaining == 0
):
status_key = 'closed'
# skip duplicate filled updates - we get the deats
# from the execution details event
@ -728,14 +869,14 @@ async def deliver_trade_events(
account=accounts_def.inverse[trade.order.account],
# everyone doin camel case..
status=status.status.lower(), # force lower case
status=status_key, # force lower case
filled=status.filled,
reason=status.whyHeld,
# this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess?
remaining=status.remaining,
remaining=remaining,
broker_details={'name': 'ib'},
)
@ -870,17 +1011,25 @@ async def deliver_trade_events(
if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}')
# TODO: what schema for this msg if we're going to make it
# portable across all backends?
# msg = BrokerdError(**err)
# TODO: we don't want to relay data feed / lookup errors
# so we need some further filtering logic here..
# for most cases the 'status' block above should take
# care of this.
# await ems_stream.send(BrokerdStatus(
# status='error',
# reqid=err['reqid'],
# reason=err['reason'],
# time_ns=time.time_ns(),
# account=accounts_def.inverse[trade.order.account],
# broker_details={'name': 'ib'},
# ))
case 'position':
cid, msg = pack_position(item)
log.info(f'New IB position msg: {msg}')
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg)
continue
case 'event':

View File

@ -101,3 +101,30 @@ def percent_change(
new: float,
) -> float:
return pnl(init, new) * 100.
def diff_dict(
d1: dict,
d2: dict,
) -> dict:
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
shared_keys = d1_keys.intersection(d2_keys)
shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
added_keys = d2_keys - d1_keys
added_deltas = {o: (None, d2[o]) for o in added_keys}
deltas = {**shared_deltas, **added_deltas}
return parse_deltas(deltas)
def parse_deltas(deltas: dict) -> dict:
res = {}
for k, v in deltas.items():
if isinstance(v[0], dict):
tmp = diff_dict(v[0], v[1])
if tmp:
res[k] = tmp
else:
res[k] = v[1]
return res

View File

@ -83,7 +83,13 @@ class OrderBook:
"""Cancel an order (or alert) in the EMS.
"""
cmd = self._sent_orders[uuid]
cmd = self._sent_orders.get(uuid)
if not cmd:
log.error(
f'Unknown order {uuid}!?\n'
f'Maybe there is a stale entry or line?\n'
f'You should report this as a bug!'
)
msg = Cancel(
oid=uuid,
symbol=cmd.symbol,
@ -149,10 +155,17 @@ async def relay_order_cmds_from_sync_code(
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}')
sym = cmd.symbol
msg = pformat(cmd)
if sym == symbol_key:
log.info(f'Send order cmd:\n{msg}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)
else:
log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
f'\n{msg}'
)
@acm
@ -220,11 +233,19 @@ async def open_ems(
fqsn=fqsn,
exec_mode=mode,
) as (ctx, (positions, accounts)),
) as (
ctx,
(
positions,
accounts,
dialogs,
)
),
# open 2-way trade command stream
ctx.open_stream() as trades_stream,
):
# start sync code order msg delivery task
async with trio.open_nursery() as n:
n.start_soon(
relay_order_cmds_from_sync_code,
@ -232,4 +253,10 @@ async def open_ems(
trades_stream
)
yield book, trades_stream, positions, accounts
yield (
book,
trades_stream,
positions,
accounts,
dialogs,
)

View File

@ -18,8 +18,8 @@
In da suit parlances: "Execution management systems"
"""
from collections import defaultdict, ChainMap
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
@ -27,6 +27,7 @@ from typing import (
AsyncIterator,
Any,
Callable,
Optional,
)
from bidict import bidict
@ -41,9 +42,16 @@ from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper
from ._messages import (
Status, Order,
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdError, BrokerdPosition,
Order,
Status,
# Cancel,
BrokerdCancel,
BrokerdOrder,
# BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdError,
BrokerdPosition,
)
@ -90,8 +98,7 @@ def mk_check(
)
@dataclass
class _DarkBook:
class _DarkBook(Struct):
'''
EMS-trigger execution book.
@ -116,17 +123,24 @@ class _DarkBook:
dict, # cmd / msg type
]
]
] = field(default_factory=dict)
] = {}
# tracks most recent values per symbol each from data feed
lasts: dict[
str,
float,
] = field(default_factory=dict)
] = {}
# mapping of piker ems order ids to current brokerd order flow message
_ems_entries: dict[str, str] = field(default_factory=dict)
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict)
# _ems_entries: dict[str, str] = {}
_active: dict = {}
# mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
_ems2brokerd_ids: dict[str, str] = bidict()
# XXX: this is in place to prevent accidental positions that are too
@ -181,6 +195,7 @@ async def clear_dark_triggers(
for oid, (
pred,
tf,
# TODO: send this msg instead?
cmd,
percent_away,
abs_diff_away
@ -188,9 +203,9 @@ async def clear_dark_triggers(
tuple(execs.items())
):
if (
not pred or
ttype not in tf or
not pred(price)
not pred
or ttype not in tf
or not pred(price)
):
# log.runtime(
# f'skipping quote for {sym} '
@ -200,30 +215,29 @@ async def clear_dark_triggers(
# majority of iterations will be non-matches
continue
brokerd_msg: Optional[BrokerdOrder] = None
match cmd:
# alert: nothing to do but relay a status
# back to the requesting ems client
case {
'action': 'alert',
}:
resp = 'alert_triggered'
case Order(action='alert'):
resp = 'triggered'
# executable order submission
case {
'action': action,
'symbol': symbol,
'account': account,
'size': size,
}:
case Order(
action=action,
symbol=symbol,
account=account,
size=size,
):
bfqsn: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away
resp = 'dark_triggered' # hidden on client-side
resp = 'triggered' # hidden on client-side
log.info(
f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}')
live_req = BrokerdOrder(
brokerd_msg = BrokerdOrder(
action=action,
oid=oid,
account=account,
@ -232,7 +246,8 @@ async def clear_dark_triggers(
price=submit_price,
size=size,
)
await brokerd_orders_stream.send(live_req)
await brokerd_orders_stream.send(brokerd_msg)
# mark this entry as having sent an order
# request. the entry will be replaced once the
@ -240,18 +255,19 @@ async def clear_dark_triggers(
# a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems.
book._ems_entries[oid] = live_req
# book._ems_entries[oid] = live_req
# book._msgflows[oid].maps.insert(0, live_req)
case _:
raise ValueError(f'Invalid dark book entry: {cmd}')
# fallthrough logic
resp = Status(
status = Status(
oid=oid, # ems dialog id
time_ns=time.time_ns(),
resp=resp,
trigger_price=price,
brokerd_msg=cmd,
req=cmd,
brokerd_msg=brokerd_msg,
)
# remove exec-condition from set
@ -262,9 +278,18 @@ async def clear_dark_triggers(
f'pred for {oid} was already removed!?'
)
# update actives
if cmd.action == 'alert':
# don't register the alert status (so it won't
# be reloaded by clients) since it's now
# complete / closed.
book._active.pop(oid)
else:
book._active[oid] = status
# send response to client-side
try:
await ems_client_order_stream.send(resp)
await ems_client_order_stream.send(status)
except (
trio.ClosedResourceError,
):
@ -281,8 +306,7 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}')
@dataclass
class TradesRelay:
class TradesRelay(Struct):
# for now we keep only a single connection open with
# each ``brokerd`` for simplicity.
@ -318,7 +342,10 @@ class Router(Struct):
# order id to client stream map
clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {}
dialogues: dict[
str,
list[tractor.MsgStream]
] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {}
@ -341,11 +368,12 @@ class Router(Struct):
loglevel: str,
) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none
already exists.
'''
Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
'''
relay = self.relays.get(feed.mod.name)
relay: TradesRelay = self.relays.get(feed.mod.name)
if (
relay is None
@ -381,6 +409,22 @@ class Router(Struct):
relay.consumers -= 1
async def client_broadcast(
self,
msg: dict,
) -> None:
for client_stream in self.clients.copy():
try:
await client_stream.send(msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
self.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
_router: Router = None
@ -452,7 +496,6 @@ async def open_brokerd_trades_dialogue(
async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
):
# XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're
@ -502,14 +545,9 @@ async def open_brokerd_trades_dialogue(
task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally:
# parent context must have been closed
@ -561,15 +599,14 @@ async def translate_and_relay_brokerd_events(
broker ems
'error' -> log it locally (for now)
'status' -> relabel as 'broker_<status>', if complete send 'executed'
'fill' -> 'broker_filled'
('status' | 'fill'} -> relayed through see ``Status`` msg type.
Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
'''
book = router.get_dark_book(broker)
relay = router.relays[broker]
book: _DarkBook = router.get_dark_book(broker)
relay: TradesRelay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream
@ -601,30 +638,16 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
for client_stream in router.clients.copy():
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
await router.client_broadcast(pos_msg)
continue
# BrokerdOrderAck
# initial response to brokerd order request
case {
'name': 'ack',
'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id
} if (
entry := book._ems_entries.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
}:
# register the brokerd request id (that was generated
# / created internally by the broker backend) with our
# local ems order id for reverse lookup later.
@ -639,23 +662,24 @@ async def translate_and_relay_brokerd_events(
# new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases:
# - the order has previously been requested to be
# 1. the order has previously been requested to be
# cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel
# signal **asap** to the backend broker
action = getattr(entry, 'action', None)
if action and action == 'cancel':
# status = book._active.get(oid)
status_msg = book._active[oid]
req = status_msg.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id
entry.reqid = reqid
# and tell broker to cancel immediately
status_msg.reqid = reqid
await brokerd_trades_stream.send(req)
# tell broker to cancel immediately
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# 2. the order is now active and will be mirrored in
# our book -> registered as live flow
else:
# update the flow with the ack msg
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
# TODO: should we relay this ack state?
status_msg.resp = 'pending'
# no msg to client necessary
continue
@ -666,11 +690,9 @@ async def translate_and_relay_brokerd_events(
'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id
'symbol': sym,
'broker_details': details,
# 'reason': reason,
}:
} if status_msg := book._active.get(oid):
msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank?
# TODO: figure out how this will interact with EMS clients
@ -680,43 +702,64 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_stream = router.dialogues[oid]
status_msg.resp = 'error'
status_msg.brokerd_msg = msg
book._active[oid] = status_msg
await ems_client_order_stream.send(status_msg)
# BrokerdStatus
case {
'name': 'status',
'status': status,
'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
} if (
oid := book._ems2brokerd_ids.inverse.get(reqid)
(oid := book._ems2brokerd_ids.inverse.get(reqid))
and status in (
'canceled',
'open',
'closed',
)
):
msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should
# they maybe even eventually be separate messages?
if status == 'cancelled':
log.info(f'Cancellation for {oid} is complete!')
# TODO: maybe pack this into a composite type that
# contains both the IPC stream as well the
# msg-chain/dialog.
ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
old_resp = status_msg.resp
status_msg.resp = status
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not remaining:
# retrieve existing live flow
old_reqid = status_msg.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
resp = 'broker_executed'
status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
await ems_client_order_stream.send(status_msg)
# be sure to pop this stream from our dialogue set
# since the order dialogue should be done.
if status == 'closed':
log.info(f'Execution for {oid} is complete!')
# just log it
else:
log.info(f'{broker} filled {msg}')
# only if we already rxed a fill then probably
# this clear is fully complete? (frickin ib..)
if old_resp == 'fill':
status_msg = book._active.pop(oid)
else:
# one of {submitted, cancelled}
resp = 'broker_' + msg.status
elif status == 'canceled':
log.cancel(f'Cancellation for {oid} is complete!')
else: # open
# relayed from backend but probably not handled so
# just log it
log.info(f'{broker} opened order {msg}')
# BrokerdFill
case {
@ -728,82 +771,111 @@ async def translate_and_relay_brokerd_events(
):
# proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled'
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
# unknown valid message case?
# case {
# 'name': name,
# 'symbol': sym,
# 'reqid': reqid, # brokerd generated order-request id
# # 'oid': oid, # ems order-dialog id
# 'broker_details': details,
ems_client_order_stream = router.dialogues[oid]
# } if (
# book._ems2brokerd_ids.inverse.get(reqid) is None
# ):
# # TODO: pretty sure we can drop this now?
# wtf a fill can come after 'closed' from ib?
status_msg = book._active[oid]
# # XXX: paper clearing special cases
# # paper engine race case: ``Client.submit_limit()`` hasn't
# # returned yet and provided an output reqid to register
# # locally, so we need to retreive the oid that was already
# # packed at submission since we already know it ahead of
# # time
# paper = details.get('paper_info')
# ext = details.get('external')
# only if we already rxed a 'closed'
# this clear is fully complete? (frickin ib..)
# if status_msg.resp == 'closed':
# status_msg = book._active.pop(oid)
# if paper:
# # paperboi keeps the ems id up front
# oid = paper['oid']
status_msg.resp = 'fill'
status_msg.reqid = reqid
status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
# elif ext:
# # may be an order msg specified as "external" to the
# # piker ems flow (i.e. generated by some other
# # external broker backend client (like tws for ib)
# log.error(f"External trade event {name}@{ext}")
# ``Status`` containing an embedded order msg which
# should be loaded as a "pre-existing open order" from the
# brokerd backend.
case {
'name': 'status',
'resp': status,
'reqid': reqid, # brokerd generated order-request id
}:
if (
status != 'open'
):
# TODO: check for an oid we might know since it was
# registered from a previous order/status load?
log.error(
f'Unknown/transient status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
# else:
# # something is out of order, we don't have an oid for
# # this broker-side message.
# log.error(
# f'Unknown oid: {oid} for msg {name}:\n'
# f'{pformat(brokerd_msg)}\n'
# 'Unable to relay message to client side!?'
# )
# TODO: we probably want some kind of "tagging" system
# for external order submissions like this eventually
# to be able to more formally handle multi-player
# trading...
else:
# existing open backend order which we broadcast to
# all currently connected clients.
log.info(
f'Relaying existing open order:\n {brokerd_msg}'
)
# continue
# use backend request id as our ems id though this
# may end up with collisions?
status_msg = Status(**brokerd_msg)
order = Order(**status_msg.req)
assert order.price and order.size
status_msg.req = order
assert status_msg.src # source tag?
oid = str(status_msg.reqid)
# attempt to avoid collisions
status_msg.reqid = oid
assert status_msg.resp == 'open'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
book._active[oid] = status_msg
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
await router.client_broadcast(status_msg)
# don't fall through
continue
# brokerd error
case {
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
# XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``.
case {
# XXX: sometimes there is a race with the backend (like
# `ib` where the pending stauts will be related before
# the ack, in which case we just ignore the faster
# pending msg and wait for our expected ack to arrive
# later (i.e. the first block below should enter).
'name': 'status',
'status': status,
'reqid': reqid,
}:
status_msg = book._active[oid]
log.warning(
'Unhandled broker status for dialog:\n'
f'{pformat(status_msg)}\n'
f'{pformat(brokerd_msg)}\n'
)
case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow
entry = book._ems_entries[oid]
assert entry.oid == oid
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# XXX: ugh sometimes we don't access it?
if status_msg:
del status_msg
# TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the
@ -829,27 +901,36 @@ async def process_client_order_cmds(
async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}')
oid = cmd['oid']
# CAWT DAMN we need struct support!
oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
# any dark/live status which is current
status = dark_book._active.get(oid)
match cmd:
# existing live-broker order cancel
case {
'action': 'cancel',
'oid': oid,
} if live_entry:
reqid = live_entry.reqid
msg = BrokerdCancel(
} if (
(status := dark_book._active.get(oid))
and status.resp in ('open', 'pending')
):
reqid = status.reqid
order = status.req
to_brokerd_msg = BrokerdCancel(
oid=oid,
reqid=reqid,
time_ns=time.time_ns(),
account=live_entry.account,
# account=live_entry.account,
account=order.account,
)
# NOTE: cancel response will be relayed back in messages
@ -859,39 +940,53 @@ async def process_client_order_cmds(
log.info(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg)
await brokerd_order_stream.send(to_brokerd_msg)
else:
# this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd
# order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg
# dark_book._ems_entries[oid] = msg
# special case for now..
status.req = to_brokerd_msg
# dark trigger cancel
case {
'action': 'cancel',
'oid': oid,
} if not live_entry:
try:
} if (
status and status.resp == 'dark_open'
# or status and status.req
):
# remove from dark book clearing
dark_book.orders[symbol].pop(oid, None)
entry = dark_book.orders[symbol].pop(oid, None)
if entry:
(
pred,
tickfilter,
cmd,
percent_away,
abs_diff_away
) = entry
# tell client side that we've cancelled the
# dark-trigger order
await client_order_stream.send(
Status(
resp='dark_cancelled',
oid=oid,
time_ns=time.time_ns(),
)
)
status.resp = 'canceled'
status.req = cmd
await client_order_stream.send(status)
# de-register this client dialogue
router.dialogues.pop(oid)
dark_book._active.pop(oid)
except KeyError:
else:
log.exception(f'No dark order for {symbol}?')
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# live order submission
case {
'oid': oid,
@ -899,11 +994,9 @@ async def process_client_order_cmds(
'price': trigger_price,
'size': size,
'action': ('buy' | 'sell') as action,
'exec_mode': 'live',
'exec_mode': ('live' | 'paper'),
}:
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# TODO: relay this order msg directly?
req = Order(**cmd)
broker = req.brokers[0]
@ -912,13 +1005,13 @@ async def process_client_order_cmds(
# aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None:
# sanity check on emsd id
assert live_entry.oid == oid
reqid = live_entry.reqid
if status is not None:
# if we already had a broker order id then
# this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}")
reqid = status.reqid
status.req = req
status.resp = 'pending'
msg = BrokerdOrder(
oid=oid, # no ib support for oids...
@ -935,6 +1028,18 @@ async def process_client_order_cmds(
account=req.account,
)
if status is None:
status = Status(
oid=oid,
reqid=reqid,
resp='pending',
time_ns=time.time_ns(),
brokerd_msg=msg,
req=req,
)
dark_book._active[oid] = status
# send request to backend
# XXX: the trades data broker response loop
# (``translate_and_relay_brokerd_events()`` above) will
@ -950,7 +1055,7 @@ async def process_client_order_cmds(
# client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel
# that live order asap.
dark_book._ems_entries[oid] = msg
# dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission
case {
@ -966,9 +1071,11 @@ async def process_client_order_cmds(
# submit order to local EMS book and scan loop,
# effectively a local clearing engine, which
# scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper')
exec_mode in ('dark',)
or action == 'alert'
):
req = Order(**cmd)
# Auto-gen scanner predicate:
# we automatically figure out what the alert check
# condition should be based on the current first
@ -1015,23 +1122,25 @@ async def process_client_order_cmds(
)[oid] = (
pred,
tickfilter,
cmd,
req,
percent_away,
abs_diff_away
)
resp = 'dark_submitted'
resp = 'dark_open'
# alerts have special msgs to distinguish
if action == 'alert':
resp = 'alert_submitted'
# if action == 'alert':
# resp = 'open'
await client_order_stream.send(
Status(
status = Status(
resp=resp,
oid=oid,
time_ns=time.time_ns(),
req=req,
src='dark',
)
)
dark_book._active[oid] = status
await client_order_stream.send(status)
@tractor.context
@ -1099,10 +1208,9 @@ async def _emsd_main(
):
# XXX: this should be initial price quote from target provider
first_quote = feed.first_quotes[fqsn]
book = _router.get_dark_book(broker)
book.lasts[fqsn] = first_quote['last']
first_quote: dict = feed.first_quotes[fqsn]
book: _DarkBook = _router.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last']
# open a stream with the brokerd backend for order
# flow dialogue
@ -1129,12 +1237,25 @@ async def _emsd_main(
await ems_ctx.started((
relay.positions,
list(relay.accounts),
book._active,
))
# establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream:
# register the client side before startingn the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream)
n.start_soon(
translate_and_relay_brokerd_events,
broker,
brokerd_stream,
_router,
)
# trigger scan and exec loop
n.start_soon(
clear_dark_triggers,
@ -1149,7 +1270,6 @@ async def _emsd_main(
# start inbound (from attached client) order request processing
try:
_router.clients.add(ems_client_order_stream)
# main entrypoint, run here until cancelled.
await process_client_order_cmds(

View File

@ -18,24 +18,92 @@
Clearing sub-system message and protocols.
"""
from typing import Optional, Union
# from collections import (
# ChainMap,
# deque,
# )
from typing import (
Optional,
Literal,
)
from ..data._source import Symbol
from ..data.types import Struct
# TODO: a composite for tracking msg flow on 2-legged
# dialogs.
# class Dialog(ChainMap):
# '''
# Msg collection abstraction to easily track the state changes of
# a msg flow in one high level, query-able and immutable construct.
# The main use case is to query data from a (long-running)
# msg-transaction-sequence
# '''
# def update(
# self,
# msg,
# ) -> None:
# self.maps.insert(0, msg.to_dict())
# def flatten(self) -> dict:
# return dict(self)
# TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
# - schema evolution:
# https://jcristharif.com/msgspec/usage.html#schema-evolution
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# --------------
# Client -> emsd
# --------------
class Order(Struct):
# TODO: ideally we can combine these 2 fields into
# 1 and just use the size polarity to determine a buy/sell.
# i would like to see this become more like
# https://jcristharif.com/msgspec/usage.html#literal
# action: Literal[
# 'live',
# 'dark',
# 'alert',
# ]
action: Literal[
'buy',
'sell',
'alert',
]
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: Literal[
'dark',
'live',
# 'paper', no right?
]
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: str | Symbol
account: str # should we set a default as '' ?
price: float
size: float # -ve is "sell", +ve is "buy"
brokers: Optional[list[str]] = []
class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or
'''
Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order.
'''
@ -44,32 +112,6 @@ class Cancel(Struct):
symbol: str
class Order(Struct):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: Union[str, Symbol]
account: str # should we set a default as '' ?
price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float
brokers: list[str]
# Assigned once initial ack is received
# ack_time_ns: Optional[int] = None
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
# --------------
# Client <- emsd
# --------------
@ -79,37 +121,39 @@ class Order(Struct):
class Status(Struct):
name: str = 'status'
oid: str # uuid4
time_ns: int
oid: str # uuid4 ems-order dialog id
# {
# 'dark_submitted',
# 'dark_cancelled',
# 'dark_triggered',
# 'broker_submitted',
# 'broker_cancelled',
# 'broker_executed',
# 'broker_filled',
# 'broker_errored',
# 'alert_submitted',
# 'alert_triggered',
# }
resp: str # "response", see above
# trigger info
trigger_price: Optional[float] = None
# price: float
# broker: Optional[str] = None
resp: Literal[
'pending', # acked by broker but not yet open
'open',
'dark_open', # dark/algo triggered order is open in ems clearing loop
'triggered', # above triggered order sent to brokerd, or an alert closed
'closed', # fully cleared all size/units
'fill', # partial execution
'canceled',
'error',
]
# this maps normally to the ``BrokerdOrder.reqid`` below, an id
# normally allocated internally by the backend broker routing system
broker_reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
# for relaying backend msg data "through" the ems layer
# the (last) source order/request msg if provided
# (eg. the Order/Cancel which causes this msg) and
# acts as a back-reference to the corresponding
# request message which was the source of this msg.
req: Optional[Order | Cancel] = None
# XXX: better design/name here?
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
src: Optional[str] = None
# for relaying a boxed brokerd-dialog-side msg data "through" the
# ems layer to clients.
brokerd_msg: dict = {}
@ -131,25 +175,28 @@ class BrokerdCancel(Struct):
# for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field
reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str
account: str
time_ns: int
# TODO: if we instead rely on a +ve/-ve size to determine
# the action we more or less don't need this field right?
action: str = '' # {buy, sell}
# "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows
# for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field
reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
symbol: str # symbol.<providername> ?
symbol: str # fqsn
price: float
size: float
@ -170,7 +217,7 @@ class BrokerdOrderAck(Struct):
name: str = 'ack'
# defined and provided by backend
reqid: Union[int, str]
reqid: int | str
# emsd id originally sent in matching request msg
oid: str
@ -180,30 +227,22 @@ class BrokerdOrderAck(Struct):
class BrokerdStatus(Struct):
name: str = 'status'
reqid: Union[int, str]
reqid: int | str
time_ns: int
status: Literal[
'open',
'canceled',
'fill',
'pending',
'error',
]
# XXX: should be best effort set for every update
account: str = ''
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# {
# 'submitted',
# 'cancelled',
# 'filled',
# }
status: str
account: str
filled: float = 0.0
reason: str = ''
remaining: float = 0.0
# XXX: better design/name here?
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
external: bool = False
# external: bool = False
# XXX: not required schema as of yet
broker_details: dict = {
@ -218,7 +257,7 @@ class BrokerdFill(Struct):
'''
name: str = 'fill'
reqid: Union[int, str]
reqid: int | str
time_ns: int
# order exeuction related
@ -248,7 +287,7 @@ class BrokerdError(Struct):
# if no brokerd order request was actually submitted (eg. we errored
# at the ``pikerd`` layer) then there will be ``reqid`` allocated.
reqid: Optional[Union[int, str]] = None
reqid: Optional[int | str] = None
symbol: str
reason: str

View File

@ -45,8 +45,13 @@ from ..data._normalize import iterticks
from ..data._source import unpack_fqsn
from ..log import get_logger
from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
BrokerdFill, BrokerdPosition, BrokerdError
BrokerdCancel,
BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdPosition,
BrokerdError,
)
@ -94,6 +99,10 @@ class PaperBoi:
'''
is_modify: bool = False
if action == 'alert':
# bypass all fill simulation
return reqid
entry = self._reqids.get(reqid)
if entry:
# order is already existing, this is a modify
@ -104,10 +113,6 @@ class PaperBoi:
# register order internally
self._reqids[reqid] = (oid, symbol, action, price)
if action == 'alert':
# bypass all fill simulation
return reqid
# TODO: net latency model
# we checkpoint here quickly particulalry
# for dark orders since we want the dark_executed
@ -119,7 +124,9 @@ class PaperBoi:
size = -size
msg = BrokerdStatus(
status='submitted',
status='open',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid,
time_ns=time.time_ns(),
filled=0.0,
@ -136,7 +143,14 @@ class PaperBoi:
) or (
action == 'sell' and (clear_price := self.last_bid[0]) >= price
):
await self.fake_fill(symbol, clear_price, size, action, reqid, oid)
await self.fake_fill(
symbol,
clear_price,
size,
action,
reqid,
oid,
)
else:
# register this submissions as a paper live order
@ -178,7 +192,9 @@ class PaperBoi:
await trio.sleep(0.05)
msg = BrokerdStatus(
status='cancelled',
status='canceled',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid,
time_ns=time.time_ns(),
broker_details={'name': 'paperboi'},
@ -230,25 +246,23 @@ class PaperBoi:
self._trade_ledger.update(fill_msg.to_dict())
if order_complete:
msg = BrokerdStatus(
reqid=reqid,
time_ns=time.time_ns(),
status='filled',
# account=f'paper_{self.broker}',
account='paper',
status='closed',
filled=size,
remaining=0 if order_complete else remaining,
broker_details={
'paper_info': {
'oid': oid,
},
'action': action,
'size': size,
'price': price,
'name': self.broker,
},
# broker_details={
# 'paper_info': {
# 'oid': oid,
# },
# 'action': action,
# 'size': size,
# 'price': price,
# 'name': self.broker,
# },
)
await self.ems_trades_stream.send(msg)
@ -393,30 +407,33 @@ async def handle_order_requests(
# order_request: dict
async for request_msg in ems_order_stream:
action = request_msg['action']
if action in {'buy', 'sell'}:
account = request_msg['account']
# action = request_msg['action']
match request_msg:
# if action in {'buy', 'sell'}:
case {'action': ('buy' | 'sell')}:
order = BrokerdOrder(**request_msg)
account = order.account
if account != 'paper':
log.error(
'This is a paper account,'
' only a `paper` selection is valid'
)
await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'],
symbol=request_msg['symbol'],
# oid=request_msg['oid'],
oid=order.oid,
# symbol=request_msg['symbol'],
symbol=order.symbol,
reason=f'Paper only. No account found: `{account}` ?',
))
continue
# validate
order = BrokerdOrder(**request_msg)
# order = BrokerdOrder(**request_msg)
if order.reqid is None:
reqid = str(uuid.uuid4())
else:
reqid = order.reqid
# if order.reqid is None:
# reqid =
# else:
reqid = order.reqid or str(uuid.uuid4())
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
@ -447,14 +464,14 @@ async def handle_order_requests(
reqid=reqid,
)
elif action == 'cancel':
# elif action == 'cancel':
case {'action': 'cancel'}:
msg = BrokerdCancel(**request_msg)
await client.submit_cancel(
reqid=msg.reqid
)
else:
case _:
log.error(f'Unknown order command: {request_msg}')

View File

@ -37,7 +37,7 @@ import time
from math import isnan
from bidict import bidict
import msgpack
from msgspec.msgpack import encode, decode
import pyqtgraph as pg
import numpy as np
import tractor
@ -774,12 +774,13 @@ async def stream_quotes(
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server
resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
encode({'streams': list(tbks.values())})
)
log.info(resp)
async def recv() -> dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8')
return decode((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams']
log.info(f"Subscribed to {streams}")

View File

@ -18,6 +18,7 @@
Built-in (extension) types.
"""
import sys
from typing import Optional
from pprint import pformat
@ -42,8 +43,13 @@ class Struct(
}
def __repr__(self):
# only turn on pprint when we detect a python REPL
# at runtime B)
if hasattr(sys, 'ps1'):
return f'Struct({pformat(self.to_dict())})'
return super().__repr__()
def copy(
self,
update: Optional[dict] = None,

View File

@ -134,6 +134,8 @@ class Position(Struct):
# unique backend symbol id
bsuid: str
split_ratio: Optional[int] = None
# ordered record of known constituent trade messages
clears: dict[
Union[str, int, Status], # trade id
@ -159,6 +161,9 @@ class Position(Struct):
clears = d.pop('clears')
expiry = d.pop('expiry')
if self.split_ratio is None:
d.pop('split_ratio')
# TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing
# it via the trades ledger..
@ -384,12 +389,22 @@ class Position(Struct):
asize_h.append(accum_size)
ppu_h.append(ppu_h[-1])
return ppu_h[-1] if ppu_h else 0
final_ppu = ppu_h[-1] if ppu_h else 0
# handle any split info entered (for now) manually by user
if self.split_ratio is not None:
final_ppu /= self.split_ratio
return final_ppu
def calc_size(self) -> float:
size: float = 0
for tid, entry in self.clears.items():
size += entry['size']
if self.split_ratio is not None:
size = round(size * self.split_ratio)
return size
def minimize_clears(
@ -848,6 +863,7 @@ def open_pps(
size = entry['size']
# TODO: remove but, handle old field name for now
ppu = entry.get('ppu', entry.get('be_price', 0))
split_ratio = entry.get('split_ratio')
expiry = entry.get('expiry')
if expiry:
@ -857,6 +873,7 @@ def open_pps(
Symbol.from_fqsn(fqsn, info={}),
size=size,
ppu=ppu,
split_ratio=split_ratio,
expiry=expiry,
bsuid=entry['bsuid'],

View File

@ -140,9 +140,9 @@ class LineEditor:
) -> LevelLine:
staged_line = self._active_staged_line
if not staged_line:
raise RuntimeError("No line is currently staged!?")
# staged_line = self._active_staged_line
# if not staged_line:
# raise RuntimeError("No line is currently staged!?")
# for now, until submission reponse arrives
line.hide_labels()

View File

@ -49,16 +49,21 @@ from ._position import (
SettingsPane,
)
from ._forms import FieldsForm
# from ._label import FormatLabel
from ._window import MultiStatus
from ..clearing._messages import Order, BrokerdPosition
from ..clearing._messages import (
Order,
Status,
# BrokerdOrder,
# BrokerdStatus,
BrokerdPosition,
)
from ._forms import open_form_input_handling
log = get_logger(__name__)
class OrderDialog(Struct):
class Dialog(Struct):
'''
Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart.
@ -141,7 +146,7 @@ class OrderMode:
current_pp: Optional[PositionTracker] = None
active: bool = False
name: str = 'order'
dialogs: dict[str, OrderDialog] = field(default_factory=dict)
dialogs: dict[str, Dialog] = field(default_factory=dict)
_colors = {
'alert': 'alert_yellow',
@ -152,10 +157,7 @@ class OrderMode:
def line_from_order(
self,
order: Order,
symbol: Symbol,
**line_kwargs,
) -> LevelLine:
@ -173,8 +175,8 @@ class OrderMode:
color=self._colors[order.action],
dotted=True if (
order.exec_mode == 'dark' and
order.action != 'alert'
order.exec_mode == 'dark'
and order.action != 'alert'
) else False,
**line_kwargs,
@ -236,7 +238,6 @@ class OrderMode:
line = self.line_from_order(
order,
symbol,
show_markers=True,
# just for the stage line to avoid
@ -262,25 +263,28 @@ class OrderMode:
def submit_order(
self,
send_msg: bool = True,
order: Optional[Order] = None,
) -> OrderDialog:
) -> Dialog:
'''
Send execution order to EMS return a level line to
represent the order on a chart.
'''
if not order:
staged = self._staged_order
symbol: Symbol = staged.symbol
oid = str(uuid.uuid4())
# symbol: Symbol = staged.symbol
# format order data for ems
order = staged.copy()
order.oid = oid
order.symbol = symbol.front_fqsn()
order.symbol = order.symbol.front_fqsn()
line = self.line_from_order(
order,
symbol,
show_markers=True,
only_show_markers_on_hover=True,
@ -298,17 +302,17 @@ class OrderMode:
# color once the submission ack arrives.
self.lines.submit_line(
line=line,
uuid=oid,
uuid=order.oid,
)
dialog = OrderDialog(
uuid=oid,
dialog = Dialog(
uuid=order.oid,
order=order,
symbol=symbol,
symbol=order.symbol,
line=line,
last_status_close=self.multistatus.open_status(
f'submitting {self._trigger_type}-{order.action}',
final_msg=f'submitted {self._trigger_type}-{order.action}',
f'submitting {order.exec_mode}-{order.action}',
final_msg=f'submitted {order.exec_mode}-{order.action}',
clear_on_next=True,
)
)
@ -318,14 +322,21 @@ class OrderMode:
# enter submission which will be popped once a response
# from the EMS is received to move the order to a different# status
self.dialogs[oid] = dialog
self.dialogs[order.oid] = dialog
# hook up mouse drag handlers
line._on_drag_start = self.order_line_modify_start
line._on_drag_end = self.order_line_modify_complete
# send order cmd to ems
if send_msg:
self.book.send(order)
else:
# just register for control over this order
# TODO: some kind of mini-perms system here based on
# an out-of-band tagging/auth sub-sys for multiplayer
# order control?
self.book._sent_orders[order.oid] = order
return dialog
@ -363,7 +374,7 @@ class OrderMode:
self,
uuid: str
) -> OrderDialog:
) -> Dialog:
'''
Order submitted status event handler.
@ -418,7 +429,7 @@ class OrderMode:
self,
uuid: str,
msg: Dict[str, Any],
msg: Status,
) -> None:
@ -442,7 +453,7 @@ class OrderMode:
# TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way?
f'{msg["resp"]}: {msg["trigger_price"]}',
f'{msg.resp}: {msg.req.price}',
],
)
log.runtime(result)
@ -502,7 +513,7 @@ class OrderMode:
oid = dialog.uuid
cancel_status_close = self.multistatus.open_status(
f'cancelling order {oid[:6]}',
f'cancelling order {oid}',
group_key=key,
)
dialog.last_status_close = cancel_status_close
@ -512,6 +523,45 @@ class OrderMode:
return ids
def load_unknown_dialog_from_msg(
self,
msg: Status,
) -> Dialog:
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
order = Order(**msg.req)
oid = str(msg.oid)
symbol = order.symbol
# TODO: MEGA UGGG ZONEEEE!
src = msg.src
if (
src
and src != 'dark'
and src not in symbol
):
fqsn = symbol + '.' + src
brokername = src
else:
fqsn = symbol
*head, brokername = fqsn.rsplit('.')
# fill out complex fields
order.oid = str(order.oid)
order.brokers = [brokername]
order.symbol = Symbol.from_fqsn(
fqsn=fqsn,
info={},
)
dialog = self.submit_order(
send_msg=False,
order=order,
)
assert self.dialogs[oid] == dialog
return dialog
@asynccontextmanager
async def open_order_mode(
@ -549,6 +599,7 @@ async def open_order_mode(
trades_stream,
position_msgs,
brokerd_accounts,
ems_dialog_msgs,
),
trio.open_nursery() as tn,
@ -596,10 +647,10 @@ async def open_order_mode(
sym = msg['symbol']
if (
sym == symkey or
# mega-UGH, i think we need to fix the FQSN stuff sooner
# then later..
sym == symkey.removesuffix(f'.{broker}')
(sym == symkey) or (
# mega-UGH, i think we need to fix the FQSN
# stuff sooner then later..
sym == symkey.removesuffix(f'.{broker}'))
):
pps_by_account[acctid] = msg
@ -703,7 +754,7 @@ async def open_order_mode(
order_pane.order_mode = mode
# select a pp to track
tracker = trackers[pp_account]
tracker: PositionTracker = trackers[pp_account]
mode.current_pp = tracker
tracker.show()
tracker.hide_info()
@ -755,38 +806,61 @@ async def open_order_mode(
# to handle input since the ems connection is ready
started.set()
for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg
await process_trade_msg(
mode,
book,
msg,
)
tn.start_soon(
process_trades_and_update_ui,
tn,
feed,
mode,
trades_stream,
mode,
book,
)
yield mode
async def process_trades_and_update_ui(
n: trio.Nursery,
feed: Feed,
mode: OrderMode,
trades_stream: tractor.MsgStream,
mode: OrderMode,
book: OrderBook,
) -> None:
get_index = mode.chart.get_index
global _pnl_tasks
# this is where we receive **back** messages
# about executions **from** the EMS actor
async for msg in trades_stream:
await process_trade_msg(
mode,
book,
msg,
)
async def process_trade_msg(
mode: OrderMode,
book: OrderBook,
msg: dict,
) -> tuple[Dialog, Status]:
get_index = mode.chart.get_index
fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}')
log.debug(f'Received order msg:\n{fmsg}')
name = msg['name']
if name in (
'position',
):
@ -811,95 +885,102 @@ async def process_trades_and_update_ui(
# short circuit to next msg to avoid
# unnecessary msg content lookups
continue
return
resp = msg['resp']
oid = msg['oid']
msg = Status(**msg)
resp = msg.resp
oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
dialog = mode.dialogs.get(oid)
if dialog is None:
log.warning(f'received msg for untracked dialog:\n{fmsg}')
# TODO: enable pure tracking / mirroring of dialogs
# is desired.
continue
# record message to dialog tracking
dialog.msgs[oid] = msg
# response to 'action' request (buy/sell)
if resp in (
'dark_submitted',
'broker_submitted'
):
match msg:
case Status(resp='dark_open' | 'open'):
if dialog is not None:
# show line label once order is live
mode.on_submit(oid)
# resp to 'cancel' request or error condition
# for action request
elif resp in (
'broker_inactive',
'broker_errored',
else:
log.warning(
f'received msg for untracked dialog:\n{fmsg}'
)
assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}'
sym = mode.chart.linked.symbol
fqsn = sym.front_fqsn()
order = Order(**msg.req)
if (
((order.symbol + f'.{msg.src}') == fqsn)
# a existing dark order for the same symbol
or (
order.symbol == fqsn
and (msg.src == 'dark') or (msg.src in fqsn)
)
):
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
# return dialog, msg
case Status(resp='error'):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
broker_msg = msg.brokerd_msg
log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in (
'broker_cancelled',
'dark_cancelled'
):
case Status(resp='canceled'):
# delete level line from view
mode.on_cancel(oid)
broker_msg = msg['brokerd_msg']
log.cancel(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
req = Order(**msg.req)
log.cancel(f'Canceled {req.action}:{oid}')
elif resp in (
'dark_triggered'
case Status(
resp='triggered',
# req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'},
):
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmsg}')
elif resp in (
'alert_triggered'
case Status(
resp='triggered',
# req=Order(exec_mode='live', action='alert') as req, # TODO
req={'exec_mode': 'live', 'action': 'alert'} as req,
):
# should only be one "fill" for an alert
# add a triangle and remove the level line
req = Order(**req)
mode.on_fill(
oid,
price=msg['trigger_price'],
price=req.price,
arrow_index=get_index(time.time()),
)
mode.lines.remove_line(uuid=oid)
msg.req = req
await mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell
elif resp in (
'broker_executed',
# response to completed 'dialog' for order request
case Status(
resp='closed',
# req=Order() as req, # TODO
req=req,
):
# right now this is just triggering a system alert
msg.req = Order(**req)
await mode.on_exec(oid, msg)
if msg['brokerd_msg']['remaining'] == 0:
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
elif resp in (
'broker_filled',
):
case Status(resp='fill'):
# handle out-of-piker fills reporting?
known_order = book._sent_orders.get(oid)
if not known_order:
log.warning(f'order {oid} is unknown')
continue
return
action = known_order.action
details = msg['brokerd_msg']
details = msg.brokerd_msg
# TODO: some kinda progress system
mode.on_fill(
@ -914,3 +995,9 @@ async def process_trades_and_update_ui(
# TODO: how should we look this up?
# tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg)
# record message to dialog tracking
if dialog:
dialog.msgs[oid] = msg
return dialog, msg