Compare commits

...

21 Commits

Author SHA1 Message Date
Tyler Goodlet 2f6e3ad03f Add dict differ helpers from SO answer 2022-08-11 16:18:05 -04:00
Tyler Goodlet b75683879a Only pprint our struct when we detect a py REPL 2022-08-11 15:56:28 -04:00
Tyler Goodlet db8a3dd1b7 Move fill case-block earlier, log broker errors 2022-08-11 14:26:34 -04:00
Tyler Goodlet 2d92ed2052 Drop `msgpack` from `marketstore` module 2022-08-11 14:21:01 -04:00
Tyler Goodlet 0756cb0289 Load boxed `.req` values as `Order`s in mode loop 2022-08-11 14:20:23 -04:00
Tyler Goodlet 66f7dd9020 'Only send `'closed'` on Filled events, lowercase all statues' 2022-08-11 14:18:53 -04:00
Tyler Goodlet 9782107153 First try mega-basic stock (reverse) split support with `ib` and `pps.toml` 2022-08-10 18:19:44 -04:00
Tyler Goodlet 1f43f660fe Passthrough filled and pendingsubmit cases 2022-08-10 18:03:25 -04:00
Tyler Goodlet d3b7d0e247 Include both symbols in error msg when a mismatch 2022-08-10 17:59:27 -04:00
Tyler Goodlet 700dbf0e2b Handle 'closed' vs. 'fill` race case..
`ib` is super good not being reliable with order event sequence order
and duplication of fill info. This adds some guards to try and avoid
popping the last status status too early if we end up receiving
a `'closed'` before the expected `'fill`' event(s). Further delete the
`status_msg` ref on each iteration to avoid stale reference lookups in
the relay task/loop.
2022-08-10 17:17:47 -04:00
Tyler Goodlet b52c4092f3 Use modern `Union` pipe op syntax for msg fields 2022-08-10 16:41:00 -04:00
Tyler Goodlet 7fe3e3f482 Add full EMS order-dialog (re-)load support!
This includes darks, lives and alerts with all connecting clients
being broadcast all existing order-flow dialog states. Obviously
for now darks and alerts only live as long as the `emsd` actor lifetime
(though we will store these in local state eventually) and "live" orders
have lifetimes managed by their respective backend broker.

The details of this change-set is extensive, so here we go..

Messaging schema:
- change the messaging `Status` status-key set to:
  `resp: Literal['pending', 'open', 'dark_open', 'triggered',
                'closed',  'fill', 'canceled', 'error']`

  which better reflects the semantics of order lifetimes and was
  partially inspired by the status keys `kraken` provides for their
  order-entry API. The prior key set was based on `ib`'s horrible
  semantics which sound like they're right out of the 80s..
  Also, we reflect this same set in the `BrokerdStatus` msg and likely
  we'll just get rid of the separate brokerd-dialog side type
  eventually.
- use `Literal` type annots for statuses where applicable and as they
  are supported by `msgspec`.
- add additional optional `Status` fields:
  -`req: Order` to allow each status msg to optionally ref its
    commanding order-request msg allowing at least a request-response
    style implicit tracing in all response msgs.
  -`src: str` tag string to show the source of the msg.
  -`reqid: str | int` such that the ems can relay the `brokerd`
    request id both to the client side and have one spot to look
    up prior status msgs and
- draft a (unused/commented) `Dialog` type which can be eventually used
  at all EMS endpoints to track msg-flow states

EMS engine adjustments/rework:
- use the new status key set throughout and expect `BrokerdStatus` msgs
  to use the same new schema as `Status`.
- add a `_DarkBook._active: dict[str, Status]` table which is now used for
  all per-leg-dialog associations and order flow state tracking
  allowing for the both the brokerd-relay and client-request handler loops
  to read/write the same msg-table and provides for delivering
  the overall EMS-active-orders state to newly/re-connecting clients
  with minimal processing; this table replaces what the `._ems_entries`
  table from prior.
- add `Router.client_broadcast()` to send a msg to all currently
  connected peers.
- a variety of msg handler block logic tweaks including more `case:`
  blocks to be both flatter and improve explicitness:
  - for the relay loop move all `Status` msg update and sending to
    within each block instead of a fallthrough case plus hard-to-follow
    state logic.
  - add a specific case for unhandled backend status keys and just log
    them.
  - pop alerts from `._active` immediately once triggered.
  - where possible mutate status msgs fields over instantiating new
    ones.
- insert and expect `Order` instances in the dark clearing loop and
  adjust `case:` blocks accordingly.
- tag `dark_open` and `triggered` statuses as sourced from the ems.
- drop all the `ChainMap` stuff for now; we're going to make our own
  `Dialog` type for this purpose..

Order mode rework:
- always parse the `Status` msg and use match syntax cases with object
  patterns, hackily assign the `.req` in many blocks to work around not
  yet having proper on-the-wire decoding yet.
- make `.load_unknown_dialog_from_msg()` expect a `Status` with boxed
  `.req: Order` as input.
- change `OrderDialog` -> `Dialog` in prep for a general purpose type
  of the same name.

`ib` backend order loading support:
- do "closed" status detection inside the msg-relay loop instead
  of expecting the ems to do this..
- add an attempt to cancel inactive orders by scheduling cancel
  submissions continually (no idea if this works).
- add a status map to go from the 80s keys to our new set.
- deliver `Status` msgs with an embedded `Order` for existing live order
  loading and make sure to try an get the source exchange info (instead
  of SMART).

Paper engine ported to match:
- use new status keys in `BrokerdStatus` msgs
- use `match:` syntax in request handler loop
2022-08-10 13:38:23 -04:00
Tyler Goodlet bbbdcad33b WIP playing with a `ChainMap` of messages 2022-08-08 13:47:41 -04:00
Tyler Goodlet a3812cd169 Fix for TWS created position loading 2022-08-08 13:47:17 -04:00
Tyler Goodlet 5ac5743c66 Deliver existing dialog (msgs) to every EMS client
Ideally every client that connects to the ems can know its state
(immediately) meaning relay all the order dialogs that are currently
active. This adds full (hacky WIP) support to receive those dialog
(msgs) from the `open_ems()` startup values via the `.started()` msg
from `_emsd_main()`.

Further this adds support to the order mode chart-UI to display existing
(live) orders on the chart during startup. Details include,

- add a `OrderMode.load_unknown_dialog_from_msg()` for processing and
  displaying a ``BrokerdStatus`` (for now) msg from the EMS that was not
  previously created by the current ems client and registering and
  displaying it on the chart.
- break out the ems msg processing into a new
  `order_mode.process_trade_msg()` func so that it can be called on the
  startup dialog-msg set as well as eventually used a more general low
  level auto-strat API (eg. when we get to displaying auto-strat and
  group trading automatically on an observing chart UI.
- hackyness around msg-processing for the dialogs delivery since we're
  technically delivering `BrokerdStatus` msgs when the client-side
  processing technically expects `Status` msgs.. we'll rectify this
  soon!
2022-08-05 21:04:31 -04:00
Tyler Goodlet aa204228ab Lol, handle failed-to-cancel statuses.. 2022-08-05 21:04:31 -04:00
Tyler Goodlet 0bd8f2bcd9 Start brokerd relay loop after opening client stream
In order to avoid missed existing order message emissions on startup we
need to be sure the client side stream is registered with the router
first. So break out the starting of the
`translate_and_relay_brokerd_events()` task until inside the client
stream block and start the task using the dark clearing loop nursery.

Also, ensure `oid` (and thus for `ib` the equivalent re-used `reqid`)
are cast to `str` before registering the dark book. Deliver the dark
book entries as part of the `_emsd_main()` context `.started()` values.
2022-08-05 21:04:31 -04:00
Tyler Goodlet 334f512ad3 Always cast ems `requid` values to `int` 2022-08-05 21:04:31 -04:00
Tyler Goodlet 71cca4ceda Drop staged line runtime guard 2022-08-05 21:04:31 -04:00
Tyler Goodlet 0d332427e2 First draft: relay open orders through ems and display on chart 2022-08-05 21:04:31 -04:00
Tyler Goodlet 02980282cd Relay existing open orders from ib on startup 2022-08-05 21:04:31 -04:00
11 changed files with 1016 additions and 526 deletions

View File

@ -36,8 +36,6 @@ from trio_typing import TaskStatus
import tractor import tractor
from ib_insync.contract import ( from ib_insync.contract import (
Contract, Contract,
# Option,
# Forex,
) )
from ib_insync.order import ( from ib_insync.order import (
Trade, Trade,
@ -61,6 +59,8 @@ from piker.pp import (
) )
from piker.log import get_console_log from piker.log import get_console_log
from piker.clearing._messages import ( from piker.clearing._messages import (
Order,
Status,
BrokerdOrder, BrokerdOrder,
BrokerdOrderAck, BrokerdOrderAck,
BrokerdStatus, BrokerdStatus,
@ -123,11 +123,13 @@ async def handle_order_requests(
f'An IB account number for name {account} is not found?\n' f'An IB account number for name {account} is not found?\n'
'Make sure you have all TWS and GW instances running.' 'Make sure you have all TWS and GW instances running.'
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(
BrokerdError(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
)) )
)
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -147,6 +149,14 @@ async def handle_order_requests(
# validate # validate
order = BrokerdOrder(**request_msg) order = BrokerdOrder(**request_msg)
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid = order.reqid
if reqid is not None:
reqid = int(reqid)
# call our client api to submit the order # call our client api to submit the order
reqid = client.submit_limit( reqid = client.submit_limit(
oid=order.oid, oid=order.oid,
@ -155,12 +165,7 @@ async def handle_order_requests(
action=order.action, action=order.action,
size=order.size, size=order.size,
account=acct_number, account=acct_number,
reqid=reqid,
# XXX: by default 0 tells ``ib_insync`` methods that
# there is no existing order so ask the client to create
# a new one (which it seems to do by allocating an int
# counter - collision prone..)
reqid=order.reqid,
) )
if reqid is None: if reqid is None:
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
@ -180,9 +185,9 @@ async def handle_order_requests(
) )
) )
elif action == 'cancel': if action == 'cancel':
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
client.submit_cancel(reqid=msg.reqid) client.submit_cancel(reqid=int(msg.reqid))
else: else:
log.error(f'Unknown order command: {request_msg}') log.error(f'Unknown order command: {request_msg}')
@ -357,11 +362,24 @@ async def update_and_audit_msgs(
# presume we're at least not more in the shit then we # presume we're at least not more in the shit then we
# thought. # thought.
if diff: if diff:
reverse_split_ratio = pikersize / ibsize
split_ratio = 1/reverse_split_ratio
if split_ratio >= reverse_split_ratio:
entry = f'split_ratio = {int(split_ratio)}'
else:
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
raise ValueError( raise ValueError(
f'POSITION MISMATCH ib <-> piker ledger:\n' f'POSITION MISMATCH ib <-> piker ledger:\n'
f'ib: {ibppmsg}\n' f'ib: {ibppmsg}\n'
f'piker: {msg}\n' f'piker: {msg}\n'
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' f'reverse_split_ratio: {reverse_split_ratio}\n'
f'split_ratio: {split_ratio}\n\n'
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
'If you are expecting a (reverse) split in this '
'instrument you should probably put the following '
f'in the `pps.toml` section:\n{entry}'
) )
msg.size = ibsize msg.size = ibsize
@ -439,7 +457,6 @@ async def trades_dialogue(
# we might also want to delegate a specific actor for # we might also want to delegate a specific actor for
# ledger writing / reading for speed? # ledger writing / reading for speed?
async with ( async with (
# trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients), open_client_proxies() as (proxies, aioclients),
): ):
# Open a trade ledgers stack for appending trade records over # Open a trade ledgers stack for appending trade records over
@ -468,6 +485,52 @@ async def trades_dialogue(
client = aioclients[account] client = aioclients[account]
trades: list[Trade] = client.ib.openTrades()
order_msgs = []
for trade in trades:
order = trade.order
quant = trade.order.totalQuantity
action = order.action.lower()
size = {
'sell': -1,
'buy': 1,
}[action] * quant
con = trade.contract
# TODO: in the case of the SMART venue (aka ib's
# router-clearing sys) we probably should handle
# showing such orders overtop of the fqsn for the
# primary exchange, how to map this easily is going
# to be a bit tricky though?
deats = await proxy.con_deats(contracts=[con])
fqsn = list(deats)[0]
reqid = order.orderId
# TODO: maybe embed a ``BrokerdOrder`` instead
# since then we can directly load it on the client
# side in the order mode loop?
msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=str(reqid),
reqid=reqid,
# embedded order info
req=Order(
action=action,
exec_mode='live',
oid=str(reqid),
symbol=fqsn,
account=accounts_def.inverse[order.account],
price=order.lmtPrice,
size=size,
),
src='ib',
)
order_msgs.append(msg)
# process pp value reported from ib's system. we only use these # process pp value reported from ib's system. we only use these
# to cross-check sizing since average pricing on their end uses # to cross-check sizing since average pricing on their end uses
# the so called (bs) "FIFO" style which more or less results in # the so called (bs) "FIFO" style which more or less results in
@ -480,6 +543,7 @@ async def trades_dialogue(
# sure know which positions to update from the ledger if # sure know which positions to update from the ledger if
# any are missing from the ``pps.toml`` # any are missing from the ``pps.toml``
bsuid, msg = pack_position(pos) bsuid, msg = pack_position(pos)
acctid = msg.account = accounts_def.inverse[msg.account] acctid = msg.account = accounts_def.inverse[msg.account]
acctid = acctid.strip('ib.') acctid = acctid.strip('ib.')
cids2pps[(acctid, bsuid)] = msg cids2pps[(acctid, bsuid)] = msg
@ -493,9 +557,7 @@ async def trades_dialogue(
or pp.size != msg.size or pp.size != msg.size
): ):
trans = norm_trade_records(ledger) trans = norm_trade_records(ledger)
updated = table.update_from_trans(trans) table.update_from_trans(trans)
pp = updated[bsuid]
# update trades ledgers for all accounts from connected # update trades ledgers for all accounts from connected
# api clients which report trades for **this session**. # api clients which report trades for **this session**.
trades = await proxy.trades() trades = await proxy.trades()
@ -521,9 +583,28 @@ async def trades_dialogue(
trans = trans_by_acct.get(acctid) trans = trans_by_acct.get(acctid)
if trans: if trans:
table.update_from_trans(trans) table.update_from_trans(trans)
updated = table.update_from_trans(trans)
assert msg.size == pp.size, 'WTF' # XXX: not sure exactly why it wouldn't be in
# the updated output (maybe this is a bug?) but
# if you create a pos from TWS and then load it
# from the api trades it seems we get a key
# error from ``update[bsuid]`` ?
pp = table.pps.get(bsuid)
if not pp:
log.error(
f'The contract id for {msg} may have '
f'changed to {bsuid}\nYou may need to '
'adjust your ledger for this, skipping '
'for now.'
)
continue
if msg.size != pp.size:
log.error(
'Position mismatch {pp.symbol.front_fqsn()}:\n'
f'ib: {msg.size}\n'
f'piker: {pp.size}\n'
)
active_pps, closed_pps = table.dump_active() active_pps, closed_pps = table.dump_active()
@ -575,6 +656,10 @@ async def trades_dialogue(
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
# relay existing open orders to ems
for msg in order_msgs:
await ems_stream.send(msg)
trade_event_stream = await n.start(open_trade_event_stream) trade_event_stream = await n.start(open_trade_event_stream)
clients.append((client, trade_event_stream)) clients.append((client, trade_event_stream))
@ -586,6 +671,7 @@ async def trades_dialogue(
for client, stream in clients: for client, stream in clients:
n.start_soon( n.start_soon(
deliver_trade_events, deliver_trade_events,
n,
stream, stream,
ems_stream, ems_stream,
accounts_def, accounts_def,
@ -661,8 +747,24 @@ async def emit_pp_update(
await ems_stream.send(msg) await ems_stream.send(msg)
_statuses: dict[str, str] = {
'cancelled': 'canceled',
'submitted': 'open',
# XXX: just pass these through? it duplicates actual fill events other
# then the case where you the `.remaining == 0` case which is our
# 'closed'` case.
# 'filled': 'pending',
# 'pendingsubmit': 'pending',
# TODO: see a current ``ib_insync`` issue around this:
# https://github.com/erdewit/ib_insync/issues/363
'inactive': 'pending',
}
async def deliver_trade_events( async def deliver_trade_events(
nurse: trio.Nursery,
trade_event_stream: trio.MemoryReceiveChannel, trade_event_stream: trio.MemoryReceiveChannel,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'` accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
@ -718,6 +820,45 @@ async def deliver_trade_events(
# unwrap needed data from ib_insync internal types # unwrap needed data from ib_insync internal types
trade: Trade = item trade: Trade = item
status: OrderStatus = trade.orderStatus status: OrderStatus = trade.orderStatus
ib_status_key = status.status.lower()
acctid = accounts_def.inverse[trade.order.account]
# double check there is no error when
# cancelling.. gawwwd
if ib_status_key == 'cancelled':
last_log = trade.log[-1]
if (
last_log.message
and 'Error' not in last_log.message
):
ib_status_key = trade.log[-2].status
print(ib_status_key)
elif ib_status_key == 'inactive':
async def sched_cancel():
log.warning(
'OH GAWD an inactive order..scheduling a cancel\n'
f'{pformat(item)}'
)
proxy = proxies[acctid]
await proxy.submit_cancel(reqid=trade.order.orderId)
await trio.sleep(1)
nurse.start_soon(sched_cancel)
nurse.start_soon(sched_cancel)
status_key = (
_statuses.get(ib_status_key)
or ib_status_key.lower()
)
remaining = status.remaining
if (
status_key == 'filled'
and remaining == 0
):
status_key = 'closed'
# skip duplicate filled updates - we get the deats # skip duplicate filled updates - we get the deats
# from the execution details event # from the execution details event
@ -728,14 +869,14 @@ async def deliver_trade_events(
account=accounts_def.inverse[trade.order.account], account=accounts_def.inverse[trade.order.account],
# everyone doin camel case.. # everyone doin camel case..
status=status.status.lower(), # force lower case status=status_key, # force lower case
filled=status.filled, filled=status.filled,
reason=status.whyHeld, reason=status.whyHeld,
# this seems to not be necessarily up to date in the # this seems to not be necessarily up to date in the
# execDetails event.. so we have to send it here I guess? # execDetails event.. so we have to send it here I guess?
remaining=status.remaining, remaining=remaining,
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
@ -870,17 +1011,25 @@ async def deliver_trade_events(
if err['reqid'] == -1: if err['reqid'] == -1:
log.error(f'TWS external order error:\n{pformat(err)}') log.error(f'TWS external order error:\n{pformat(err)}')
# TODO: what schema for this msg if we're going to make it # TODO: we don't want to relay data feed / lookup errors
# portable across all backends? # so we need some further filtering logic here..
# msg = BrokerdError(**err) # for most cases the 'status' block above should take
# care of this.
# await ems_stream.send(BrokerdStatus(
# status='error',
# reqid=err['reqid'],
# reason=err['reason'],
# time_ns=time.time_ns(),
# account=accounts_def.inverse[trade.order.account],
# broker_details={'name': 'ib'},
# ))
case 'position': case 'position':
cid, msg = pack_position(item) cid, msg = pack_position(item)
log.info(f'New IB position msg: {msg}') log.info(f'New IB position msg: {msg}')
# acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg) continue
case 'event': case 'event':

View File

@ -101,3 +101,30 @@ def percent_change(
new: float, new: float,
) -> float: ) -> float:
return pnl(init, new) * 100. return pnl(init, new) * 100.
def diff_dict(
d1: dict,
d2: dict,
) -> dict:
d1_keys = set(d1.keys())
d2_keys = set(d2.keys())
shared_keys = d1_keys.intersection(d2_keys)
shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
added_keys = d2_keys - d1_keys
added_deltas = {o: (None, d2[o]) for o in added_keys}
deltas = {**shared_deltas, **added_deltas}
return parse_deltas(deltas)
def parse_deltas(deltas: dict) -> dict:
res = {}
for k, v in deltas.items():
if isinstance(v[0], dict):
tmp = diff_dict(v[0], v[1])
if tmp:
res[k] = tmp
else:
res[k] = v[1]
return res

View File

@ -83,7 +83,13 @@ class OrderBook:
"""Cancel an order (or alert) in the EMS. """Cancel an order (or alert) in the EMS.
""" """
cmd = self._sent_orders[uuid] cmd = self._sent_orders.get(uuid)
if not cmd:
log.error(
f'Unknown order {uuid}!?\n'
f'Maybe there is a stale entry or line?\n'
f'You should report this as a bug!'
)
msg = Cancel( msg = Cancel(
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
@ -149,10 +155,17 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd.symbol == symbol_key: sym = cmd.symbol
log.info(f'Send order cmd:\n{pformat(cmd)}') msg = pformat(cmd)
if sym == symbol_key:
log.info(f'Send order cmd:\n{msg}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)
else:
log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
f'\n{msg}'
)
@acm @acm
@ -220,11 +233,19 @@ async def open_ems(
fqsn=fqsn, fqsn=fqsn,
exec_mode=mode, exec_mode=mode,
) as (ctx, (positions, accounts)), ) as (
ctx,
(
positions,
accounts,
dialogs,
)
),
# open 2-way trade command stream # open 2-way trade command stream
ctx.open_stream() as trades_stream, ctx.open_stream() as trades_stream,
): ):
# start sync code order msg delivery task
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon( n.start_soon(
relay_order_cmds_from_sync_code, relay_order_cmds_from_sync_code,
@ -232,4 +253,10 @@ async def open_ems(
trades_stream trades_stream
) )
yield book, trades_stream, positions, accounts yield (
book,
trades_stream,
positions,
accounts,
dialogs,
)

View File

@ -18,8 +18,8 @@
In da suit parlances: "Execution management systems" In da suit parlances: "Execution management systems"
""" """
from collections import defaultdict, ChainMap
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan from math import isnan
from pprint import pformat from pprint import pformat
import time import time
@ -27,6 +27,7 @@ from typing import (
AsyncIterator, AsyncIterator,
Any, Any,
Callable, Callable,
Optional,
) )
from bidict import bidict from bidict import bidict
@ -41,9 +42,16 @@ from ..data.types import Struct
from .._daemon import maybe_spawn_brokerd from .._daemon import maybe_spawn_brokerd
from . import _paper_engine as paper from . import _paper_engine as paper
from ._messages import ( from ._messages import (
Status, Order, Order,
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, Status,
BrokerdFill, BrokerdError, BrokerdPosition, # Cancel,
BrokerdCancel,
BrokerdOrder,
# BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdError,
BrokerdPosition,
) )
@ -90,8 +98,7 @@ def mk_check(
) )
@dataclass class _DarkBook(Struct):
class _DarkBook:
''' '''
EMS-trigger execution book. EMS-trigger execution book.
@ -116,17 +123,24 @@ class _DarkBook:
dict, # cmd / msg type dict, # cmd / msg type
] ]
] ]
] = field(default_factory=dict) ] = {}
# tracks most recent values per symbol each from data feed # tracks most recent values per symbol each from data feed
lasts: dict[ lasts: dict[
str, str,
float, float,
] = field(default_factory=dict) ] = {}
# mapping of piker ems order ids to current brokerd order flow message # _ems_entries: dict[str, str] = {}
_ems_entries: dict[str, str] = field(default_factory=dict) _active: dict = {}
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict)
# mapping of ems dialog ids to msg flow history
_msgflows: defaultdict[
int,
ChainMap[dict[str, dict]],
] = defaultdict(ChainMap)
_ems2brokerd_ids: dict[str, str] = bidict()
# XXX: this is in place to prevent accidental positions that are too # XXX: this is in place to prevent accidental positions that are too
@ -181,6 +195,7 @@ async def clear_dark_triggers(
for oid, ( for oid, (
pred, pred,
tf, tf,
# TODO: send this msg instead?
cmd, cmd,
percent_away, percent_away,
abs_diff_away abs_diff_away
@ -188,9 +203,9 @@ async def clear_dark_triggers(
tuple(execs.items()) tuple(execs.items())
): ):
if ( if (
not pred or not pred
ttype not in tf or or ttype not in tf
not pred(price) or not pred(price)
): ):
# log.runtime( # log.runtime(
# f'skipping quote for {sym} ' # f'skipping quote for {sym} '
@ -200,30 +215,29 @@ async def clear_dark_triggers(
# majority of iterations will be non-matches # majority of iterations will be non-matches
continue continue
brokerd_msg: Optional[BrokerdOrder] = None
match cmd: match cmd:
# alert: nothing to do but relay a status # alert: nothing to do but relay a status
# back to the requesting ems client # back to the requesting ems client
case { case Order(action='alert'):
'action': 'alert', resp = 'triggered'
}:
resp = 'alert_triggered'
# executable order submission # executable order submission
case { case Order(
'action': action, action=action,
'symbol': symbol, symbol=symbol,
'account': account, account=account,
'size': size, size=size,
}: ):
bfqsn: str = symbol.replace(f'.{broker}', '') bfqsn: str = symbol.replace(f'.{broker}', '')
submit_price = price + abs_diff_away submit_price = price + abs_diff_away
resp = 'dark_triggered' # hidden on client-side resp = 'triggered' # hidden on client-side
log.info( log.info(
f'Dark order triggered for price {price}\n' f'Dark order triggered for price {price}\n'
f'Submitting order @ price {submit_price}') f'Submitting order @ price {submit_price}')
live_req = BrokerdOrder( brokerd_msg = BrokerdOrder(
action=action, action=action,
oid=oid, oid=oid,
account=account, account=account,
@ -232,7 +246,8 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=size, size=size,
) )
await brokerd_orders_stream.send(live_req)
await brokerd_orders_stream.send(brokerd_msg)
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -240,18 +255,19 @@ async def clear_dark_triggers(
# a ``BrokerdOrderAck`` msg including the # a ``BrokerdOrderAck`` msg including the
# allocated unique ``BrokerdOrderAck.reqid`` key # allocated unique ``BrokerdOrderAck.reqid`` key
# generated by the broker's own systems. # generated by the broker's own systems.
book._ems_entries[oid] = live_req # book._ems_entries[oid] = live_req
# book._msgflows[oid].maps.insert(0, live_req)
case _: case _:
raise ValueError(f'Invalid dark book entry: {cmd}') raise ValueError(f'Invalid dark book entry: {cmd}')
# fallthrough logic # fallthrough logic
resp = Status( status = Status(
oid=oid, # ems dialog id oid=oid, # ems dialog id
time_ns=time.time_ns(), time_ns=time.time_ns(),
resp=resp, resp=resp,
trigger_price=price, req=cmd,
brokerd_msg=cmd, brokerd_msg=brokerd_msg,
) )
# remove exec-condition from set # remove exec-condition from set
@ -262,9 +278,18 @@ async def clear_dark_triggers(
f'pred for {oid} was already removed!?' f'pred for {oid} was already removed!?'
) )
# update actives
if cmd.action == 'alert':
# don't register the alert status (so it won't
# be reloaded by clients) since it's now
# complete / closed.
book._active.pop(oid)
else:
book._active[oid] = status
# send response to client-side # send response to client-side
try: try:
await ems_client_order_stream.send(resp) await ems_client_order_stream.send(status)
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
): ):
@ -281,8 +306,7 @@ async def clear_dark_triggers(
# print(f'execs scan took: {time.time() - start}') # print(f'execs scan took: {time.time() - start}')
@dataclass class TradesRelay(Struct):
class TradesRelay:
# for now we keep only a single connection open with # for now we keep only a single connection open with
# each ``brokerd`` for simplicity. # each ``brokerd`` for simplicity.
@ -318,7 +342,10 @@ class Router(Struct):
# order id to client stream map # order id to client stream map
clients: set[tractor.MsgStream] = set() clients: set[tractor.MsgStream] = set()
dialogues: dict[str, list[tractor.MsgStream]] = {} dialogues: dict[
str,
list[tractor.MsgStream]
] = {}
# brokername to trades-dialogues streams with ``brokerd`` actors # brokername to trades-dialogues streams with ``brokerd`` actors
relays: dict[str, TradesRelay] = {} relays: dict[str, TradesRelay] = {}
@ -341,11 +368,12 @@ class Router(Struct):
loglevel: str, loglevel: str,
) -> tuple[dict, tractor.MsgStream]: ) -> tuple[dict, tractor.MsgStream]:
'''Open and yield ``brokerd`` trades dialogue context-stream if none '''
already exists. Open and yield ``brokerd`` trades dialogue context-stream if
none already exists.
''' '''
relay = self.relays.get(feed.mod.name) relay: TradesRelay = self.relays.get(feed.mod.name)
if ( if (
relay is None relay is None
@ -381,6 +409,22 @@ class Router(Struct):
relay.consumers -= 1 relay.consumers -= 1
async def client_broadcast(
self,
msg: dict,
) -> None:
for client_stream in self.clients.copy():
try:
await client_stream.send(msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
self.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
_router: Router = None _router: Router = None
@ -452,7 +496,6 @@ async def open_brokerd_trades_dialogue(
async with ( async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream, brokerd_ctx.open_stream() as brokerd_trades_stream,
): ):
# XXX: really we only want one stream per `emsd` actor # XXX: really we only want one stream per `emsd` actor
# to relay global `brokerd` order events unless we're # to relay global `brokerd` order events unless we're
@ -502,14 +545,9 @@ async def open_brokerd_trades_dialogue(
task_status.started(relay) task_status.started(relay)
await translate_and_relay_brokerd_events(
broker,
brokerd_trades_stream,
_router,
)
# this context should block here indefinitely until # this context should block here indefinitely until
# the ``brokerd`` task either dies or is cancelled # the ``brokerd`` task either dies or is cancelled
await trio.sleep_forever()
finally: finally:
# parent context must have been closed # parent context must have been closed
@ -561,15 +599,14 @@ async def translate_and_relay_brokerd_events(
broker ems broker ems
'error' -> log it locally (for now) 'error' -> log it locally (for now)
'status' -> relabel as 'broker_<status>', if complete send 'executed' ('status' | 'fill'} -> relayed through see ``Status`` msg type.
'fill' -> 'broker_filled'
Currently handled status values from IB: Currently handled status values from IB:
{'presubmitted', 'submitted', 'cancelled', 'inactive'} {'presubmitted', 'submitted', 'cancelled', 'inactive'}
''' '''
book = router.get_dark_book(broker) book: _DarkBook = router.get_dark_book(broker)
relay = router.relays[broker] relay: TradesRelay = router.relays[broker]
assert relay.brokerd_dialogue == brokerd_trades_stream assert relay.brokerd_dialogue == brokerd_trades_stream
@ -601,30 +638,16 @@ async def translate_and_relay_brokerd_events(
# fan-out-relay position msgs immediately by # fan-out-relay position msgs immediately by
# broadcasting updates on all client streams # broadcasting updates on all client streams
for client_stream in router.clients.copy(): await router.client_broadcast(pos_msg)
try:
await client_stream.send(pos_msg)
except(
trio.ClosedResourceError,
trio.BrokenResourceError,
):
router.clients.remove(client_stream)
log.warning(
f'client for {client_stream} was already closed?')
continue continue
# BrokerdOrderAck # BrokerdOrderAck
# initial response to brokerd order request
case { case {
'name': 'ack', 'name': 'ack',
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'oid': oid, # ems order-dialog id 'oid': oid, # ems order-dialog id
} if ( }:
entry := book._ems_entries.get(oid)
):
# initial response to brokerd order request
# if name == 'ack':
# register the brokerd request id (that was generated # register the brokerd request id (that was generated
# / created internally by the broker backend) with our # / created internally by the broker backend) with our
# local ems order id for reverse lookup later. # local ems order id for reverse lookup later.
@ -639,23 +662,24 @@ async def translate_and_relay_brokerd_events(
# new order which has not yet be registered into the # new order which has not yet be registered into the
# local ems book, insert it now and handle 2 cases: # local ems book, insert it now and handle 2 cases:
# - the order has previously been requested to be # 1. the order has previously been requested to be
# cancelled by the ems controlling client before we # cancelled by the ems controlling client before we
# received this ack, in which case we relay that cancel # received this ack, in which case we relay that cancel
# signal **asap** to the backend broker # signal **asap** to the backend broker
action = getattr(entry, 'action', None) # status = book._active.get(oid)
if action and action == 'cancel': status_msg = book._active[oid]
req = status_msg.req
if req and req.action == 'cancel':
# assign newly providerd broker backend request id # assign newly providerd broker backend request id
entry.reqid = reqid # and tell broker to cancel immediately
status_msg.reqid = reqid
await brokerd_trades_stream.send(req)
# tell broker to cancel immediately # 2. the order is now active and will be mirrored in
await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
else: else:
# update the flow with the ack msg # TODO: should we relay this ack state?
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) status_msg.resp = 'pending'
# no msg to client necessary # no msg to client necessary
continue continue
@ -666,11 +690,9 @@ async def translate_and_relay_brokerd_events(
'oid': oid, # ems order-dialog id 'oid': oid, # ems order-dialog id
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
'symbol': sym, 'symbol': sym,
'broker_details': details, } if status_msg := book._active.get(oid):
# 'reason': reason,
}:
msg = BrokerdError(**brokerd_msg) msg = BrokerdError(**brokerd_msg)
resp = 'broker_errored'
log.error(pformat(msg)) # XXX make one when it's blank? log.error(pformat(msg)) # XXX make one when it's blank?
# TODO: figure out how this will interact with EMS clients # TODO: figure out how this will interact with EMS clients
@ -680,43 +702,64 @@ async def translate_and_relay_brokerd_events(
# some unexpected failure - something we need to think more # some unexpected failure - something we need to think more
# about. In most default situations, with composed orders # about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy. # (ex. brackets), most brokers seem to use a oca policy.
ems_client_order_stream = router.dialogues[oid]
status_msg.resp = 'error'
status_msg.brokerd_msg = msg
book._active[oid] = status_msg
await ems_client_order_stream.send(status_msg)
# BrokerdStatus # BrokerdStatus
case { case {
'name': 'status', 'name': 'status',
'status': status, 'status': status,
'reqid': reqid, # brokerd generated order-request id 'reqid': reqid, # brokerd generated order-request id
# TODO: feels like the wrong msg for this field?
'remaining': remaining,
} if ( } if (
oid := book._ems2brokerd_ids.inverse.get(reqid) (oid := book._ems2brokerd_ids.inverse.get(reqid))
and status in (
'canceled',
'open',
'closed',
)
): ):
msg = BrokerdStatus(**brokerd_msg) msg = BrokerdStatus(**brokerd_msg)
# TODO: should we flatten out these cases and/or should # TODO: maybe pack this into a composite type that
# they maybe even eventually be separate messages? # contains both the IPC stream as well the
if status == 'cancelled': # msg-chain/dialog.
log.info(f'Cancellation for {oid} is complete!') ems_client_order_stream = router.dialogues[oid]
status_msg = book._active[oid]
old_resp = status_msg.resp
status_msg.resp = status
if status == 'filled': # retrieve existing live flow
# conditional execution is fully complete, no more old_reqid = status_msg.reqid
# fills for the noted order if old_reqid and old_reqid != reqid:
if not remaining: log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
resp = 'broker_executed' status_msg.reqid = reqid # THIS LINE IS CRITICAL!
status_msg.brokerd_msg = msg
status_msg.src = msg.broker_details['name']
await ems_client_order_stream.send(status_msg)
# be sure to pop this stream from our dialogue set if status == 'closed':
# since the order dialogue should be done.
log.info(f'Execution for {oid} is complete!') log.info(f'Execution for {oid} is complete!')
# just log it # only if we already rxed a fill then probably
else: # this clear is fully complete? (frickin ib..)
log.info(f'{broker} filled {msg}') if old_resp == 'fill':
status_msg = book._active.pop(oid)
else: elif status == 'canceled':
# one of {submitted, cancelled} log.cancel(f'Cancellation for {oid} is complete!')
resp = 'broker_' + msg.status
else: # open
# relayed from backend but probably not handled so
# just log it
log.info(f'{broker} opened order {msg}')
# BrokerdFill # BrokerdFill
case { case {
@ -728,82 +771,111 @@ async def translate_and_relay_brokerd_events(
): ):
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
msg = BrokerdFill(**brokerd_msg) msg = BrokerdFill(**brokerd_msg)
resp = 'broker_filled' log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# unknown valid message case? ems_client_order_stream = router.dialogues[oid]
# case {
# 'name': name,
# 'symbol': sym,
# 'reqid': reqid, # brokerd generated order-request id
# # 'oid': oid, # ems order-dialog id
# 'broker_details': details,
# } if ( # wtf a fill can come after 'closed' from ib?
# book._ems2brokerd_ids.inverse.get(reqid) is None status_msg = book._active[oid]
# ):
# # TODO: pretty sure we can drop this now?
# # XXX: paper clearing special cases # only if we already rxed a 'closed'
# # paper engine race case: ``Client.submit_limit()`` hasn't # this clear is fully complete? (frickin ib..)
# # returned yet and provided an output reqid to register # if status_msg.resp == 'closed':
# # locally, so we need to retreive the oid that was already # status_msg = book._active.pop(oid)
# # packed at submission since we already know it ahead of
# # time
# paper = details.get('paper_info')
# ext = details.get('external')
# if paper: status_msg.resp = 'fill'
# # paperboi keeps the ems id up front status_msg.reqid = reqid
# oid = paper['oid'] status_msg.brokerd_msg = msg
await ems_client_order_stream.send(status_msg)
# elif ext: # ``Status`` containing an embedded order msg which
# # may be an order msg specified as "external" to the # should be loaded as a "pre-existing open order" from the
# # piker ems flow (i.e. generated by some other # brokerd backend.
# # external broker backend client (like tws for ib) case {
# log.error(f"External trade event {name}@{ext}") 'name': 'status',
'resp': status,
'reqid': reqid, # brokerd generated order-request id
}:
if (
status != 'open'
):
# TODO: check for an oid we might know since it was
# registered from a previous order/status load?
log.error(
f'Unknown/transient status msg:\n'
f'{pformat(brokerd_msg)}\n'
'Unable to relay message to client side!?'
)
# else: # TODO: we probably want some kind of "tagging" system
# # something is out of order, we don't have an oid for # for external order submissions like this eventually
# # this broker-side message. # to be able to more formally handle multi-player
# log.error( # trading...
# f'Unknown oid: {oid} for msg {name}:\n' else:
# f'{pformat(brokerd_msg)}\n' # existing open backend order which we broadcast to
# 'Unable to relay message to client side!?' # all currently connected clients.
# ) log.info(
f'Relaying existing open order:\n {brokerd_msg}'
)
# continue # use backend request id as our ems id though this
# may end up with collisions?
status_msg = Status(**brokerd_msg)
order = Order(**status_msg.req)
assert order.price and order.size
status_msg.req = order
assert status_msg.src # source tag?
oid = str(status_msg.reqid)
# attempt to avoid collisions
status_msg.reqid = oid
assert status_msg.resp == 'open'
# register this existing broker-side dialog
book._ems2brokerd_ids[oid] = reqid
book._active[oid] = status_msg
# fan-out-relay position msgs immediately by
# broadcasting updates on all client streams
await router.client_broadcast(status_msg)
# don't fall through
continue
# brokerd error
case {
'name': 'status',
'status': 'error',
}:
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
# XXX: we presume the brokerd cancels its own order
# TOO FAST ``BrokerdStatus`` that arrives
# before the ``BrokerdAck``.
case {
# XXX: sometimes there is a race with the backend (like
# `ib` where the pending stauts will be related before
# the ack, in which case we just ignore the faster
# pending msg and wait for our expected ack to arrive
# later (i.e. the first block below should enter).
'name': 'status',
'status': status,
'reqid': reqid,
}:
status_msg = book._active[oid]
log.warning(
'Unhandled broker status for dialog:\n'
f'{pformat(status_msg)}\n'
f'{pformat(brokerd_msg)}\n'
)
case _: case _:
raise ValueError(f'Brokerd message {brokerd_msg} is invalid') raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
# retrieve existing live flow # XXX: ugh sometimes we don't access it?
entry = book._ems_entries[oid] if status_msg:
assert entry.oid == oid del status_msg
old_reqid = entry.reqid
if old_reqid and old_reqid != reqid:
log.warning(
f'Brokerd order id change for {oid}:\n'
f'{old_reqid} -> {reqid}'
)
# Create and relay response status message
# to requesting EMS client
try:
ems_client_order_stream = router.dialogues[oid]
await ems_client_order_stream.send(
Status(
oid=oid,
resp=resp,
time_ns=time.time_ns(),
broker_reqid=reqid,
brokerd_msg=msg,
)
)
except KeyError:
log.error(
f'Received `brokerd` msg for unknown client with oid: {oid}')
# TODO: do we want this to keep things cleaned up? # TODO: do we want this to keep things cleaned up?
# it might require a special status from brokerd to affirm the # it might require a special status from brokerd to affirm the
@ -829,27 +901,36 @@ async def process_client_order_cmds(
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info(f'Received order cmd:\n{pformat(cmd)}') log.info(f'Received order cmd:\n{pformat(cmd)}')
oid = cmd['oid'] # CAWT DAMN we need struct support!
oid = str(cmd['oid'])
# register this stream as an active dialogue for this order id # register this stream as an active dialogue for this order id
# such that translated message from the brokerd backend can be # such that translated message from the brokerd backend can be
# routed (relayed) to **just** that client stream (and in theory # routed (relayed) to **just** that client stream (and in theory
# others who are registered for such order affiliated msgs). # others who are registered for such order affiliated msgs).
client_dialogues[oid] = client_order_stream client_dialogues[oid] = client_order_stream
reqid = dark_book._ems2brokerd_ids.inverse.get(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
live_entry = dark_book._ems_entries.get(oid)
# any dark/live status which is current
status = dark_book._active.get(oid)
match cmd: match cmd:
# existing live-broker order cancel # existing live-broker order cancel
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if live_entry: } if (
reqid = live_entry.reqid (status := dark_book._active.get(oid))
msg = BrokerdCancel( and status.resp in ('open', 'pending')
):
reqid = status.reqid
order = status.req
to_brokerd_msg = BrokerdCancel(
oid=oid, oid=oid,
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
account=live_entry.account, # account=live_entry.account,
account=order.account,
) )
# NOTE: cancel response will be relayed back in messages # NOTE: cancel response will be relayed back in messages
@ -859,39 +940,53 @@ async def process_client_order_cmds(
log.info( log.info(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(to_brokerd_msg)
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
# acked yet by a brokerd, so register a cancel for when # acked yet by a brokerd, so register a cancel for when
# the order ack does show up later such that the brokerd # the order ack does show up later such that the brokerd
# order request can be cancelled at that time. # order request can be cancelled at that time.
dark_book._ems_entries[oid] = msg # dark_book._ems_entries[oid] = msg
# special case for now..
status.req = to_brokerd_msg
# dark trigger cancel # dark trigger cancel
case { case {
'action': 'cancel', 'action': 'cancel',
'oid': oid, 'oid': oid,
} if not live_entry: } if (
try: status and status.resp == 'dark_open'
# or status and status.req
):
# remove from dark book clearing # remove from dark book clearing
dark_book.orders[symbol].pop(oid, None) entry = dark_book.orders[symbol].pop(oid, None)
if entry:
(
pred,
tickfilter,
cmd,
percent_away,
abs_diff_away
) = entry
# tell client side that we've cancelled the # tell client side that we've cancelled the
# dark-trigger order # dark-trigger order
await client_order_stream.send( status.resp = 'canceled'
Status( status.req = cmd
resp='dark_cancelled',
oid=oid, await client_order_stream.send(status)
time_ns=time.time_ns(),
)
)
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
dark_book._active.pop(oid)
except KeyError: else:
log.exception(f'No dark order for {symbol}?') log.exception(f'No dark order for {symbol}?')
# TODO: eventually we should be receiving
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
# live order submission # live order submission
case { case {
'oid': oid, 'oid': oid,
@ -899,11 +994,9 @@ async def process_client_order_cmds(
'price': trigger_price, 'price': trigger_price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
'exec_mode': 'live', 'exec_mode': ('live' | 'paper'),
}: }:
# TODO: eventually we should be receiving # TODO: relay this order msg directly?
# this struct on the wire unpacked in a scoped protocol
# setup with ``tractor``.
req = Order(**cmd) req = Order(**cmd)
broker = req.brokers[0] broker = req.brokers[0]
@ -912,13 +1005,13 @@ async def process_client_order_cmds(
# aren't expectig their own name, but should they? # aren't expectig their own name, but should they?
sym = fqsn.replace(f'.{broker}', '') sym = fqsn.replace(f'.{broker}', '')
if live_entry is not None: if status is not None:
# sanity check on emsd id
assert live_entry.oid == oid
reqid = live_entry.reqid
# if we already had a broker order id then # if we already had a broker order id then
# this is likely an order update commmand. # this is likely an order update commmand.
log.info(f"Modifying live {broker} order: {reqid}") log.info(f"Modifying live {broker} order: {reqid}")
reqid = status.reqid
status.req = req
status.resp = 'pending'
msg = BrokerdOrder( msg = BrokerdOrder(
oid=oid, # no ib support for oids... oid=oid, # no ib support for oids...
@ -935,6 +1028,18 @@ async def process_client_order_cmds(
account=req.account, account=req.account,
) )
if status is None:
status = Status(
oid=oid,
reqid=reqid,
resp='pending',
time_ns=time.time_ns(),
brokerd_msg=msg,
req=req,
)
dark_book._active[oid] = status
# send request to backend # send request to backend
# XXX: the trades data broker response loop # XXX: the trades data broker response loop
# (``translate_and_relay_brokerd_events()`` above) will # (``translate_and_relay_brokerd_events()`` above) will
@ -950,7 +1055,7 @@ async def process_client_order_cmds(
# client, before that ack, when the ack does arrive we # client, before that ack, when the ack does arrive we
# immediately take the reqid from the broker and cancel # immediately take the reqid from the broker and cancel
# that live order asap. # that live order asap.
dark_book._ems_entries[oid] = msg # dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
# dark-order / alert submission # dark-order / alert submission
case { case {
@ -966,9 +1071,11 @@ async def process_client_order_cmds(
# submit order to local EMS book and scan loop, # submit order to local EMS book and scan loop,
# effectively a local clearing engine, which # effectively a local clearing engine, which
# scans for conditions and triggers matching executions # scans for conditions and triggers matching executions
exec_mode in ('dark', 'paper') exec_mode in ('dark',)
or action == 'alert' or action == 'alert'
): ):
req = Order(**cmd)
# Auto-gen scanner predicate: # Auto-gen scanner predicate:
# we automatically figure out what the alert check # we automatically figure out what the alert check
# condition should be based on the current first # condition should be based on the current first
@ -1015,23 +1122,25 @@ async def process_client_order_cmds(
)[oid] = ( )[oid] = (
pred, pred,
tickfilter, tickfilter,
cmd, req,
percent_away, percent_away,
abs_diff_away abs_diff_away
) )
resp = 'dark_submitted' resp = 'dark_open'
# alerts have special msgs to distinguish # alerts have special msgs to distinguish
if action == 'alert': # if action == 'alert':
resp = 'alert_submitted' # resp = 'open'
await client_order_stream.send( status = Status(
Status(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
req=req,
src='dark',
) )
) dark_book._active[oid] = status
await client_order_stream.send(status)
@tractor.context @tractor.context
@ -1099,10 +1208,9 @@ async def _emsd_main(
): ):
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
first_quote = feed.first_quotes[fqsn] first_quote: dict = feed.first_quotes[fqsn]
book: _DarkBook = _router.get_dark_book(broker)
book = _router.get_dark_book(broker) book.lasts[fqsn]: float = first_quote['last']
book.lasts[fqsn] = first_quote['last']
# open a stream with the brokerd backend for order # open a stream with the brokerd backend for order
# flow dialogue # flow dialogue
@ -1129,12 +1237,25 @@ async def _emsd_main(
await ems_ctx.started(( await ems_ctx.started((
relay.positions, relay.positions,
list(relay.accounts), list(relay.accounts),
book._active,
)) ))
# establish 2-way stream with requesting order-client and # establish 2-way stream with requesting order-client and
# begin handling inbound order requests and updates # begin handling inbound order requests and updates
async with ems_ctx.open_stream() as ems_client_order_stream: async with ems_ctx.open_stream() as ems_client_order_stream:
# register the client side before startingn the
# brokerd-side relay task to ensure the client is
# delivered all exisiting open orders on startup.
_router.clients.add(ems_client_order_stream)
n.start_soon(
translate_and_relay_brokerd_events,
broker,
brokerd_stream,
_router,
)
# trigger scan and exec loop # trigger scan and exec loop
n.start_soon( n.start_soon(
clear_dark_triggers, clear_dark_triggers,
@ -1149,7 +1270,6 @@ async def _emsd_main(
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
try: try:
_router.clients.add(ems_client_order_stream)
# main entrypoint, run here until cancelled. # main entrypoint, run here until cancelled.
await process_client_order_cmds( await process_client_order_cmds(

View File

@ -18,24 +18,92 @@
Clearing sub-system message and protocols. Clearing sub-system message and protocols.
""" """
from typing import Optional, Union # from collections import (
# ChainMap,
# deque,
# )
from typing import (
Optional,
Literal,
)
from ..data._source import Symbol from ..data._source import Symbol
from ..data.types import Struct from ..data.types import Struct
# TODO: a composite for tracking msg flow on 2-legged
# dialogs.
# class Dialog(ChainMap):
# '''
# Msg collection abstraction to easily track the state changes of
# a msg flow in one high level, query-able and immutable construct.
# The main use case is to query data from a (long-running)
# msg-transaction-sequence
# '''
# def update(
# self,
# msg,
# ) -> None:
# self.maps.insert(0, msg.to_dict())
# def flatten(self) -> dict:
# return dict(self)
# TODO: ``msgspec`` stuff worth paying attention to: # TODO: ``msgspec`` stuff worth paying attention to:
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution # - schema evolution:
# https://jcristharif.com/msgspec/usage.html#schema-evolution
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# - use literals for a common msg determined by diff keys? # - use literals for a common msg determined by diff keys?
# - https://jcristharif.com/msgspec/usage.html#literal # - https://jcristharif.com/msgspec/usage.html#literal
# - for eg. ``BrokerdStatus``, instead just have separate messages?
# -------------- # --------------
# Client -> emsd # Client -> emsd
# -------------- # --------------
class Order(Struct):
# TODO: ideally we can combine these 2 fields into
# 1 and just use the size polarity to determine a buy/sell.
# i would like to see this become more like
# https://jcristharif.com/msgspec/usage.html#literal
# action: Literal[
# 'live',
# 'dark',
# 'alert',
# ]
action: Literal[
'buy',
'sell',
'alert',
]
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: Literal[
'dark',
'live',
# 'paper', no right?
]
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: str | Symbol
account: str # should we set a default as '' ?
price: float
size: float # -ve is "sell", +ve is "buy"
brokers: Optional[list[str]] = []
class Cancel(Struct): class Cancel(Struct):
'''Cancel msg for removing a dark (ems triggered) or '''
Cancel msg for removing a dark (ems triggered) or
broker-submitted (live) trigger/order. broker-submitted (live) trigger/order.
''' '''
@ -44,32 +112,6 @@ class Cancel(Struct):
symbol: str symbol: str
class Order(Struct):
# TODO: use ``msgspec.Literal``
# https://jcristharif.com/msgspec/usage.html#literal
action: str # {'buy', 'sell', 'alert'}
# internal ``emdsd`` unique "order id"
oid: str # uuid4
symbol: Union[str, Symbol]
account: str # should we set a default as '' ?
price: float
# TODO: could we drop the ``.action`` field above and instead just
# use +/- values here? Would make the msg smaller at the sake of a
# teensie fp precision?
size: float
brokers: list[str]
# Assigned once initial ack is received
# ack_time_ns: Optional[int] = None
# determines whether the create execution
# will be submitted to the ems or directly to
# the backend broker
exec_mode: str # {'dark', 'live', 'paper'}
# -------------- # --------------
# Client <- emsd # Client <- emsd
# -------------- # --------------
@ -79,37 +121,39 @@ class Order(Struct):
class Status(Struct): class Status(Struct):
name: str = 'status' name: str = 'status'
oid: str # uuid4
time_ns: int time_ns: int
oid: str # uuid4 ems-order dialog id
# { resp: Literal[
# 'dark_submitted', 'pending', # acked by broker but not yet open
# 'dark_cancelled', 'open',
# 'dark_triggered', 'dark_open', # dark/algo triggered order is open in ems clearing loop
'triggered', # above triggered order sent to brokerd, or an alert closed
# 'broker_submitted', 'closed', # fully cleared all size/units
# 'broker_cancelled', 'fill', # partial execution
# 'broker_executed', 'canceled',
# 'broker_filled', 'error',
# 'broker_errored', ]
# 'alert_submitted',
# 'alert_triggered',
# }
resp: str # "response", see above
# trigger info
trigger_price: Optional[float] = None
# price: float
# broker: Optional[str] = None
# this maps normally to the ``BrokerdOrder.reqid`` below, an id # this maps normally to the ``BrokerdOrder.reqid`` below, an id
# normally allocated internally by the backend broker routing system # normally allocated internally by the backend broker routing system
broker_reqid: Optional[Union[int, str]] = None reqid: Optional[int | str] = None
# for relaying backend msg data "through" the ems layer # the (last) source order/request msg if provided
# (eg. the Order/Cancel which causes this msg) and
# acts as a back-reference to the corresponding
# request message which was the source of this msg.
req: Optional[Order | Cancel] = None
# XXX: better design/name here?
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
src: Optional[str] = None
# for relaying a boxed brokerd-dialog-side msg data "through" the
# ems layer to clients.
brokerd_msg: dict = {} brokerd_msg: dict = {}
@ -131,25 +175,28 @@ class BrokerdCancel(Struct):
# for setting a unique order id then this value will be relayed back # for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid`` # on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field # field
reqid: Optional[Union[int, str]] = None reqid: Optional[int | str] = None
class BrokerdOrder(Struct): class BrokerdOrder(Struct):
action: str # {buy, sell}
oid: str oid: str
account: str account: str
time_ns: int time_ns: int
# TODO: if we instead rely on a +ve/-ve size to determine
# the action we more or less don't need this field right?
action: str = '' # {buy, sell}
# "broker request id": broker specific/internal order id if this is # "broker request id": broker specific/internal order id if this is
# None, creates a new order otherwise if the id is valid the backend # None, creates a new order otherwise if the id is valid the backend
# api must modify the existing matching order. If the broker allows # api must modify the existing matching order. If the broker allows
# for setting a unique order id then this value will be relayed back # for setting a unique order id then this value will be relayed back
# on the emsd order request stream as the ``BrokerdOrderAck.reqid`` # on the emsd order request stream as the ``BrokerdOrderAck.reqid``
# field # field
reqid: Optional[Union[int, str]] = None reqid: Optional[int | str] = None
symbol: str # symbol.<providername> ? symbol: str # fqsn
price: float price: float
size: float size: float
@ -170,7 +217,7 @@ class BrokerdOrderAck(Struct):
name: str = 'ack' name: str = 'ack'
# defined and provided by backend # defined and provided by backend
reqid: Union[int, str] reqid: int | str
# emsd id originally sent in matching request msg # emsd id originally sent in matching request msg
oid: str oid: str
@ -180,30 +227,22 @@ class BrokerdOrderAck(Struct):
class BrokerdStatus(Struct): class BrokerdStatus(Struct):
name: str = 'status' name: str = 'status'
reqid: Union[int, str] reqid: int | str
time_ns: int time_ns: int
status: Literal[
'open',
'canceled',
'fill',
'pending',
'error',
]
# XXX: should be best effort set for every update account: str
account: str = ''
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
# {
# 'submitted',
# 'cancelled',
# 'filled',
# }
status: str
filled: float = 0.0 filled: float = 0.0
reason: str = '' reason: str = ''
remaining: float = 0.0 remaining: float = 0.0
# XXX: better design/name here? # external: bool = False
# flag that can be set to indicate a message for an order
# event that wasn't originated by piker's emsd (eg. some external
# trading system which does it's own order control but that you
# might want to "track" using piker UIs/systems).
external: bool = False
# XXX: not required schema as of yet # XXX: not required schema as of yet
broker_details: dict = { broker_details: dict = {
@ -218,7 +257,7 @@ class BrokerdFill(Struct):
''' '''
name: str = 'fill' name: str = 'fill'
reqid: Union[int, str] reqid: int | str
time_ns: int time_ns: int
# order exeuction related # order exeuction related
@ -248,7 +287,7 @@ class BrokerdError(Struct):
# if no brokerd order request was actually submitted (eg. we errored # if no brokerd order request was actually submitted (eg. we errored
# at the ``pikerd`` layer) then there will be ``reqid`` allocated. # at the ``pikerd`` layer) then there will be ``reqid`` allocated.
reqid: Optional[Union[int, str]] = None reqid: Optional[int | str] = None
symbol: str symbol: str
reason: str reason: str

View File

@ -45,8 +45,13 @@ from ..data._normalize import iterticks
from ..data._source import unpack_fqsn from ..data._source import unpack_fqsn
from ..log import get_logger from ..log import get_logger
from ._messages import ( from ._messages import (
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdCancel,
BrokerdFill, BrokerdPosition, BrokerdError BrokerdOrder,
BrokerdOrderAck,
BrokerdStatus,
BrokerdFill,
BrokerdPosition,
BrokerdError,
) )
@ -94,6 +99,10 @@ class PaperBoi:
''' '''
is_modify: bool = False is_modify: bool = False
if action == 'alert':
# bypass all fill simulation
return reqid
entry = self._reqids.get(reqid) entry = self._reqids.get(reqid)
if entry: if entry:
# order is already existing, this is a modify # order is already existing, this is a modify
@ -104,10 +113,6 @@ class PaperBoi:
# register order internally # register order internally
self._reqids[reqid] = (oid, symbol, action, price) self._reqids[reqid] = (oid, symbol, action, price)
if action == 'alert':
# bypass all fill simulation
return reqid
# TODO: net latency model # TODO: net latency model
# we checkpoint here quickly particulalry # we checkpoint here quickly particulalry
# for dark orders since we want the dark_executed # for dark orders since we want the dark_executed
@ -119,7 +124,9 @@ class PaperBoi:
size = -size size = -size
msg = BrokerdStatus( msg = BrokerdStatus(
status='submitted', status='open',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
filled=0.0, filled=0.0,
@ -136,7 +143,14 @@ class PaperBoi:
) or ( ) or (
action == 'sell' and (clear_price := self.last_bid[0]) >= price action == 'sell' and (clear_price := self.last_bid[0]) >= price
): ):
await self.fake_fill(symbol, clear_price, size, action, reqid, oid) await self.fake_fill(
symbol,
clear_price,
size,
action,
reqid,
oid,
)
else: else:
# register this submissions as a paper live order # register this submissions as a paper live order
@ -178,7 +192,9 @@ class PaperBoi:
await trio.sleep(0.05) await trio.sleep(0.05)
msg = BrokerdStatus( msg = BrokerdStatus(
status='cancelled', status='canceled',
# account=f'paper_{self.broker}',
account='paper',
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_details={'name': 'paperboi'}, broker_details={'name': 'paperboi'},
@ -230,25 +246,23 @@ class PaperBoi:
self._trade_ledger.update(fill_msg.to_dict()) self._trade_ledger.update(fill_msg.to_dict())
if order_complete: if order_complete:
msg = BrokerdStatus( msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
# account=f'paper_{self.broker}',
status='filled', account='paper',
status='closed',
filled=size, filled=size,
remaining=0 if order_complete else remaining, remaining=0 if order_complete else remaining,
# broker_details={
broker_details={ # 'paper_info': {
'paper_info': { # 'oid': oid,
'oid': oid, # },
}, # 'action': action,
'action': action, # 'size': size,
'size': size, # 'price': price,
'price': price, # 'name': self.broker,
'name': self.broker, # },
},
) )
await self.ems_trades_stream.send(msg) await self.ems_trades_stream.send(msg)
@ -393,30 +407,33 @@ async def handle_order_requests(
# order_request: dict # order_request: dict
async for request_msg in ems_order_stream: async for request_msg in ems_order_stream:
action = request_msg['action'] # action = request_msg['action']
match request_msg:
if action in {'buy', 'sell'}: # if action in {'buy', 'sell'}:
case {'action': ('buy' | 'sell')}:
account = request_msg['account'] order = BrokerdOrder(**request_msg)
account = order.account
if account != 'paper': if account != 'paper':
log.error( log.error(
'This is a paper account,' 'This is a paper account,'
' only a `paper` selection is valid' ' only a `paper` selection is valid'
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=request_msg['oid'], # oid=request_msg['oid'],
symbol=request_msg['symbol'], oid=order.oid,
# symbol=request_msg['symbol'],
symbol=order.symbol,
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
)) ))
continue continue
# validate # validate
order = BrokerdOrder(**request_msg) # order = BrokerdOrder(**request_msg)
if order.reqid is None: # if order.reqid is None:
reqid = str(uuid.uuid4()) # reqid =
else: # else:
reqid = order.reqid reqid = order.reqid or str(uuid.uuid4())
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -447,14 +464,14 @@ async def handle_order_requests(
reqid=reqid, reqid=reqid,
) )
elif action == 'cancel': # elif action == 'cancel':
case {'action': 'cancel'}:
msg = BrokerdCancel(**request_msg) msg = BrokerdCancel(**request_msg)
await client.submit_cancel( await client.submit_cancel(
reqid=msg.reqid reqid=msg.reqid
) )
else: case _:
log.error(f'Unknown order command: {request_msg}') log.error(f'Unknown order command: {request_msg}')

View File

@ -37,7 +37,7 @@ import time
from math import isnan from math import isnan
from bidict import bidict from bidict import bidict
import msgpack from msgspec.msgpack import encode, decode
import pyqtgraph as pg import pyqtgraph as pg
import numpy as np import numpy as np
import tractor import tractor
@ -774,12 +774,13 @@ async def stream_quotes(
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
# send subs topics to server # send subs topics to server
resp = await ws.send_message( resp = await ws.send_message(
msgpack.dumps({'streams': list(tbks.values())})
encode({'streams': list(tbks.values())})
) )
log.info(resp) log.info(resp)
async def recv() -> dict[str, Any]: async def recv() -> dict[str, Any]:
return msgpack.loads((await ws.get_message()), encoding='utf-8') return decode((await ws.get_message()), encoding='utf-8')
streams = (await recv())['streams'] streams = (await recv())['streams']
log.info(f"Subscribed to {streams}") log.info(f"Subscribed to {streams}")

View File

@ -18,6 +18,7 @@
Built-in (extension) types. Built-in (extension) types.
""" """
import sys
from typing import Optional from typing import Optional
from pprint import pformat from pprint import pformat
@ -42,8 +43,13 @@ class Struct(
} }
def __repr__(self): def __repr__(self):
# only turn on pprint when we detect a python REPL
# at runtime B)
if hasattr(sys, 'ps1'):
return f'Struct({pformat(self.to_dict())})' return f'Struct({pformat(self.to_dict())})'
return super().__repr__()
def copy( def copy(
self, self,
update: Optional[dict] = None, update: Optional[dict] = None,

View File

@ -134,6 +134,8 @@ class Position(Struct):
# unique backend symbol id # unique backend symbol id
bsuid: str bsuid: str
split_ratio: Optional[int] = None
# ordered record of known constituent trade messages # ordered record of known constituent trade messages
clears: dict[ clears: dict[
Union[str, int, Status], # trade id Union[str, int, Status], # trade id
@ -159,6 +161,9 @@ class Position(Struct):
clears = d.pop('clears') clears = d.pop('clears')
expiry = d.pop('expiry') expiry = d.pop('expiry')
if self.split_ratio is None:
d.pop('split_ratio')
# TODO: we need to figure out how to have one top level # TODO: we need to figure out how to have one top level
# listing venue here even when the backend isn't providing # listing venue here even when the backend isn't providing
# it via the trades ledger.. # it via the trades ledger..
@ -384,12 +389,22 @@ class Position(Struct):
asize_h.append(accum_size) asize_h.append(accum_size)
ppu_h.append(ppu_h[-1]) ppu_h.append(ppu_h[-1])
return ppu_h[-1] if ppu_h else 0 final_ppu = ppu_h[-1] if ppu_h else 0
# handle any split info entered (for now) manually by user
if self.split_ratio is not None:
final_ppu /= self.split_ratio
return final_ppu
def calc_size(self) -> float: def calc_size(self) -> float:
size: float = 0 size: float = 0
for tid, entry in self.clears.items(): for tid, entry in self.clears.items():
size += entry['size'] size += entry['size']
if self.split_ratio is not None:
size = round(size * self.split_ratio)
return size return size
def minimize_clears( def minimize_clears(
@ -848,6 +863,7 @@ def open_pps(
size = entry['size'] size = entry['size']
# TODO: remove but, handle old field name for now # TODO: remove but, handle old field name for now
ppu = entry.get('ppu', entry.get('be_price', 0)) ppu = entry.get('ppu', entry.get('be_price', 0))
split_ratio = entry.get('split_ratio')
expiry = entry.get('expiry') expiry = entry.get('expiry')
if expiry: if expiry:
@ -857,6 +873,7 @@ def open_pps(
Symbol.from_fqsn(fqsn, info={}), Symbol.from_fqsn(fqsn, info={}),
size=size, size=size,
ppu=ppu, ppu=ppu,
split_ratio=split_ratio,
expiry=expiry, expiry=expiry,
bsuid=entry['bsuid'], bsuid=entry['bsuid'],

View File

@ -140,9 +140,9 @@ class LineEditor:
) -> LevelLine: ) -> LevelLine:
staged_line = self._active_staged_line # staged_line = self._active_staged_line
if not staged_line: # if not staged_line:
raise RuntimeError("No line is currently staged!?") # raise RuntimeError("No line is currently staged!?")
# for now, until submission reponse arrives # for now, until submission reponse arrives
line.hide_labels() line.hide_labels()

View File

@ -49,16 +49,21 @@ from ._position import (
SettingsPane, SettingsPane,
) )
from ._forms import FieldsForm from ._forms import FieldsForm
# from ._label import FormatLabel
from ._window import MultiStatus from ._window import MultiStatus
from ..clearing._messages import Order, BrokerdPosition from ..clearing._messages import (
Order,
Status,
# BrokerdOrder,
# BrokerdStatus,
BrokerdPosition,
)
from ._forms import open_form_input_handling from ._forms import open_form_input_handling
log = get_logger(__name__) log = get_logger(__name__)
class OrderDialog(Struct): class Dialog(Struct):
''' '''
Trade dialogue meta-data describing the lifetime Trade dialogue meta-data describing the lifetime
of an order submission to ``emsd`` from a chart. of an order submission to ``emsd`` from a chart.
@ -141,7 +146,7 @@ class OrderMode:
current_pp: Optional[PositionTracker] = None current_pp: Optional[PositionTracker] = None
active: bool = False active: bool = False
name: str = 'order' name: str = 'order'
dialogs: dict[str, OrderDialog] = field(default_factory=dict) dialogs: dict[str, Dialog] = field(default_factory=dict)
_colors = { _colors = {
'alert': 'alert_yellow', 'alert': 'alert_yellow',
@ -152,10 +157,7 @@ class OrderMode:
def line_from_order( def line_from_order(
self, self,
order: Order, order: Order,
symbol: Symbol,
**line_kwargs, **line_kwargs,
) -> LevelLine: ) -> LevelLine:
@ -173,8 +175,8 @@ class OrderMode:
color=self._colors[order.action], color=self._colors[order.action],
dotted=True if ( dotted=True if (
order.exec_mode == 'dark' and order.exec_mode == 'dark'
order.action != 'alert' and order.action != 'alert'
) else False, ) else False,
**line_kwargs, **line_kwargs,
@ -236,7 +238,6 @@ class OrderMode:
line = self.line_from_order( line = self.line_from_order(
order, order,
symbol,
show_markers=True, show_markers=True,
# just for the stage line to avoid # just for the stage line to avoid
@ -262,25 +263,28 @@ class OrderMode:
def submit_order( def submit_order(
self, self,
send_msg: bool = True,
order: Optional[Order] = None,
) -> OrderDialog: ) -> Dialog:
''' '''
Send execution order to EMS return a level line to Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
if not order:
staged = self._staged_order staged = self._staged_order
symbol: Symbol = staged.symbol
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# symbol: Symbol = staged.symbol
# format order data for ems # format order data for ems
order = staged.copy() order = staged.copy()
order.oid = oid order.oid = oid
order.symbol = symbol.front_fqsn()
order.symbol = order.symbol.front_fqsn()
line = self.line_from_order( line = self.line_from_order(
order, order,
symbol,
show_markers=True, show_markers=True,
only_show_markers_on_hover=True, only_show_markers_on_hover=True,
@ -298,17 +302,17 @@ class OrderMode:
# color once the submission ack arrives. # color once the submission ack arrives.
self.lines.submit_line( self.lines.submit_line(
line=line, line=line,
uuid=oid, uuid=order.oid,
) )
dialog = OrderDialog( dialog = Dialog(
uuid=oid, uuid=order.oid,
order=order, order=order,
symbol=symbol, symbol=order.symbol,
line=line, line=line,
last_status_close=self.multistatus.open_status( last_status_close=self.multistatus.open_status(
f'submitting {self._trigger_type}-{order.action}', f'submitting {order.exec_mode}-{order.action}',
final_msg=f'submitted {self._trigger_type}-{order.action}', final_msg=f'submitted {order.exec_mode}-{order.action}',
clear_on_next=True, clear_on_next=True,
) )
) )
@ -318,14 +322,21 @@ class OrderMode:
# enter submission which will be popped once a response # enter submission which will be popped once a response
# from the EMS is received to move the order to a different# status # from the EMS is received to move the order to a different# status
self.dialogs[oid] = dialog self.dialogs[order.oid] = dialog
# hook up mouse drag handlers # hook up mouse drag handlers
line._on_drag_start = self.order_line_modify_start line._on_drag_start = self.order_line_modify_start
line._on_drag_end = self.order_line_modify_complete line._on_drag_end = self.order_line_modify_complete
# send order cmd to ems # send order cmd to ems
if send_msg:
self.book.send(order) self.book.send(order)
else:
# just register for control over this order
# TODO: some kind of mini-perms system here based on
# an out-of-band tagging/auth sub-sys for multiplayer
# order control?
self.book._sent_orders[order.oid] = order
return dialog return dialog
@ -363,7 +374,7 @@ class OrderMode:
self, self,
uuid: str uuid: str
) -> OrderDialog: ) -> Dialog:
''' '''
Order submitted status event handler. Order submitted status event handler.
@ -418,7 +429,7 @@ class OrderMode:
self, self,
uuid: str, uuid: str,
msg: Dict[str, Any], msg: Status,
) -> None: ) -> None:
@ -442,7 +453,7 @@ class OrderMode:
# TODO: add in standard fill/exec info that maybe we # TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way? # pack in a broker independent way?
f'{msg["resp"]}: {msg["trigger_price"]}', f'{msg.resp}: {msg.req.price}',
], ],
) )
log.runtime(result) log.runtime(result)
@ -502,7 +513,7 @@ class OrderMode:
oid = dialog.uuid oid = dialog.uuid
cancel_status_close = self.multistatus.open_status( cancel_status_close = self.multistatus.open_status(
f'cancelling order {oid[:6]}', f'cancelling order {oid}',
group_key=key, group_key=key,
) )
dialog.last_status_close = cancel_status_close dialog.last_status_close = cancel_status_close
@ -512,6 +523,45 @@ class OrderMode:
return ids return ids
def load_unknown_dialog_from_msg(
self,
msg: Status,
) -> Dialog:
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
order = Order(**msg.req)
oid = str(msg.oid)
symbol = order.symbol
# TODO: MEGA UGGG ZONEEEE!
src = msg.src
if (
src
and src != 'dark'
and src not in symbol
):
fqsn = symbol + '.' + src
brokername = src
else:
fqsn = symbol
*head, brokername = fqsn.rsplit('.')
# fill out complex fields
order.oid = str(order.oid)
order.brokers = [brokername]
order.symbol = Symbol.from_fqsn(
fqsn=fqsn,
info={},
)
dialog = self.submit_order(
send_msg=False,
order=order,
)
assert self.dialogs[oid] == dialog
return dialog
@asynccontextmanager @asynccontextmanager
async def open_order_mode( async def open_order_mode(
@ -549,6 +599,7 @@ async def open_order_mode(
trades_stream, trades_stream,
position_msgs, position_msgs,
brokerd_accounts, brokerd_accounts,
ems_dialog_msgs,
), ),
trio.open_nursery() as tn, trio.open_nursery() as tn,
@ -596,10 +647,10 @@ async def open_order_mode(
sym = msg['symbol'] sym = msg['symbol']
if ( if (
sym == symkey or (sym == symkey) or (
# mega-UGH, i think we need to fix the FQSN stuff sooner # mega-UGH, i think we need to fix the FQSN
# then later.. # stuff sooner then later..
sym == symkey.removesuffix(f'.{broker}') sym == symkey.removesuffix(f'.{broker}'))
): ):
pps_by_account[acctid] = msg pps_by_account[acctid] = msg
@ -653,7 +704,7 @@ async def open_order_mode(
# setup order mode sidepane widgets # setup order mode sidepane widgets
form: FieldsForm = chart.sidepane form: FieldsForm = chart.sidepane
form.vbox.setSpacing( form.vbox.setSpacing(
int((1 + 5/8)*_font.px_size) int((1 + 5 / 8) * _font.px_size)
) )
from ._feedstatus import mk_feed_label from ._feedstatus import mk_feed_label
@ -703,7 +754,7 @@ async def open_order_mode(
order_pane.order_mode = mode order_pane.order_mode = mode
# select a pp to track # select a pp to track
tracker = trackers[pp_account] tracker: PositionTracker = trackers[pp_account]
mode.current_pp = tracker mode.current_pp = tracker
tracker.show() tracker.show()
tracker.hide_info() tracker.hide_info()
@ -755,38 +806,61 @@ async def open_order_mode(
# to handle input since the ems connection is ready # to handle input since the ems connection is ready
started.set() started.set()
for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid'])
msg['brokerd_msg'] = msg
await process_trade_msg(
mode,
book,
msg,
)
tn.start_soon( tn.start_soon(
process_trades_and_update_ui, process_trades_and_update_ui,
tn,
feed,
mode,
trades_stream, trades_stream,
mode,
book, book,
) )
yield mode yield mode
async def process_trades_and_update_ui( async def process_trades_and_update_ui(
n: trio.Nursery,
feed: Feed,
mode: OrderMode,
trades_stream: tractor.MsgStream, trades_stream: tractor.MsgStream,
mode: OrderMode,
book: OrderBook, book: OrderBook,
) -> None: ) -> None:
get_index = mode.chart.get_index
global _pnl_tasks
# this is where we receive **back** messages # this is where we receive **back** messages
# about executions **from** the EMS actor # about executions **from** the EMS actor
async for msg in trades_stream: async for msg in trades_stream:
await process_trade_msg(
mode,
book,
msg,
)
async def process_trade_msg(
mode: OrderMode,
book: OrderBook,
msg: dict,
) -> tuple[Dialog, Status]:
get_index = mode.chart.get_index
fmsg = pformat(msg) fmsg = pformat(msg)
log.info(f'Received order msg:\n{fmsg}') log.debug(f'Received order msg:\n{fmsg}')
name = msg['name'] name = msg['name']
if name in ( if name in (
'position', 'position',
): ):
@ -811,95 +885,102 @@ async def process_trades_and_update_ui(
# short circuit to next msg to avoid # short circuit to next msg to avoid
# unnecessary msg content lookups # unnecessary msg content lookups
continue return
resp = msg['resp'] msg = Status(**msg)
oid = msg['oid'] resp = msg.resp
oid = msg.oid
dialog: Dialog = mode.dialogs.get(oid)
dialog = mode.dialogs.get(oid) match msg:
if dialog is None: case Status(resp='dark_open' | 'open'):
log.warning(f'received msg for untracked dialog:\n{fmsg}')
# TODO: enable pure tracking / mirroring of dialogs
# is desired.
continue
# record message to dialog tracking
dialog.msgs[oid] = msg
# response to 'action' request (buy/sell)
if resp in (
'dark_submitted',
'broker_submitted'
):
if dialog is not None:
# show line label once order is live # show line label once order is live
mode.on_submit(oid) mode.on_submit(oid)
# resp to 'cancel' request or error condition else:
# for action request log.warning(
elif resp in ( f'received msg for untracked dialog:\n{fmsg}'
'broker_inactive', )
'broker_errored', assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}'
sym = mode.chart.linked.symbol
fqsn = sym.front_fqsn()
order = Order(**msg.req)
if (
((order.symbol + f'.{msg.src}') == fqsn)
# a existing dark order for the same symbol
or (
order.symbol == fqsn
and (msg.src == 'dark') or (msg.src in fqsn)
)
): ):
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
# return dialog, msg
case Status(resp='error'):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
broker_msg = msg['brokerd_msg'] broker_msg = msg.brokerd_msg
log.error( log.error(
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
) )
elif resp in ( case Status(resp='canceled'):
'broker_cancelled',
'dark_cancelled'
):
# delete level line from view # delete level line from view
mode.on_cancel(oid) mode.on_cancel(oid)
broker_msg = msg['brokerd_msg'] req = Order(**msg.req)
log.cancel( log.cancel(f'Canceled {req.action}:{oid}')
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
)
elif resp in ( case Status(
'dark_triggered' resp='triggered',
# req=Order(exec_mode='dark') # TODO:
req={'exec_mode': 'dark'},
): ):
# TODO: UX for a "pending" clear/live order
log.info(f'Dark order triggered for {fmsg}') log.info(f'Dark order triggered for {fmsg}')
elif resp in ( case Status(
'alert_triggered' resp='triggered',
# req=Order(exec_mode='live', action='alert') as req, # TODO
req={'exec_mode': 'live', 'action': 'alert'} as req,
): ):
# should only be one "fill" for an alert # should only be one "fill" for an alert
# add a triangle and remove the level line # add a triangle and remove the level line
req = Order(**req)
mode.on_fill( mode.on_fill(
oid, oid,
price=msg['trigger_price'], price=req.price,
arrow_index=get_index(time.time()), arrow_index=get_index(time.time()),
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
msg.req = req
await mode.on_exec(oid, msg) await mode.on_exec(oid, msg)
# response to completed 'action' request for buy/sell # response to completed 'dialog' for order request
elif resp in ( case Status(
'broker_executed', resp='closed',
# req=Order() as req, # TODO
req=req,
): ):
# right now this is just triggering a system alert msg.req = Order(**req)
await mode.on_exec(oid, msg) await mode.on_exec(oid, msg)
if msg['brokerd_msg']['remaining'] == 0:
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually # each clearing tick is responded individually
elif resp in ( case Status(resp='fill'):
'broker_filled',
):
# handle out-of-piker fills reporting?
known_order = book._sent_orders.get(oid) known_order = book._sent_orders.get(oid)
if not known_order: if not known_order:
log.warning(f'order {oid} is unknown') log.warning(f'order {oid} is unknown')
continue return
action = known_order.action action = known_order.action
details = msg['brokerd_msg'] details = msg.brokerd_msg
# TODO: some kinda progress system # TODO: some kinda progress system
mode.on_fill( mode.on_fill(
@ -914,3 +995,9 @@ async def process_trades_and_update_ui(
# TODO: how should we look this up? # TODO: how should we look this up?
# tracker = mode.trackers[msg['account']] # tracker = mode.trackers[msg['account']]
# tracker.live_pp.fills.append(msg) # tracker.live_pp.fills.append(msg)
# record message to dialog tracking
if dialog:
dialog.msgs[oid] = msg
return dialog, msg