Compare commits
	
		
			21 Commits 
		
	
	
		
			gitea_feat
			...
			dict_diffe
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 2f6e3ad03f | |
|  | b75683879a | |
|  | db8a3dd1b7 | |
|  | 2d92ed2052 | |
|  | 0756cb0289 | |
|  | 66f7dd9020 | |
|  | 9782107153 | |
|  | 1f43f660fe | |
|  | d3b7d0e247 | |
|  | 700dbf0e2b | |
|  | b52c4092f3 | |
|  | 7fe3e3f482 | |
|  | bbbdcad33b | |
|  | a3812cd169 | |
|  | 5ac5743c66 | |
|  | aa204228ab | |
|  | 0bd8f2bcd9 | |
|  | 334f512ad3 | |
|  | 71cca4ceda | |
|  | 0d332427e2 | |
|  | 02980282cd | 
|  | @ -36,8 +36,6 @@ from trio_typing import TaskStatus | |||
| import tractor | ||||
| from ib_insync.contract import ( | ||||
|     Contract, | ||||
|     # Option, | ||||
|     # Forex, | ||||
| ) | ||||
| from ib_insync.order import ( | ||||
|     Trade, | ||||
|  | @ -61,6 +59,8 @@ from piker.pp import ( | |||
| ) | ||||
| from piker.log import get_console_log | ||||
| from piker.clearing._messages import ( | ||||
|     Order, | ||||
|     Status, | ||||
|     BrokerdOrder, | ||||
|     BrokerdOrderAck, | ||||
|     BrokerdStatus, | ||||
|  | @ -123,11 +123,13 @@ async def handle_order_requests( | |||
|                 f'An IB account number for name {account} is not found?\n' | ||||
|                 'Make sure you have all TWS and GW instances running.' | ||||
|             ) | ||||
|             await ems_order_stream.send(BrokerdError( | ||||
|                 oid=request_msg['oid'], | ||||
|                 symbol=request_msg['symbol'], | ||||
|                 reason=f'No account found: `{account}` ?', | ||||
|             )) | ||||
|             await ems_order_stream.send( | ||||
|                 BrokerdError( | ||||
|                     oid=request_msg['oid'], | ||||
|                     symbol=request_msg['symbol'], | ||||
|                     reason=f'No account found: `{account}` ?', | ||||
|                 ) | ||||
|             ) | ||||
|             continue | ||||
| 
 | ||||
|         client = _accounts2clients.get(account) | ||||
|  | @ -147,6 +149,14 @@ async def handle_order_requests( | |||
|             # validate | ||||
|             order = BrokerdOrder(**request_msg) | ||||
| 
 | ||||
|             # XXX: by default 0 tells ``ib_insync`` methods that | ||||
|             # there is no existing order so ask the client to create | ||||
|             # a new one (which it seems to do by allocating an int | ||||
|             # counter - collision prone..) | ||||
|             reqid = order.reqid | ||||
|             if reqid is not None: | ||||
|                 reqid = int(reqid) | ||||
| 
 | ||||
|             # call our client api to submit the order | ||||
|             reqid = client.submit_limit( | ||||
|                 oid=order.oid, | ||||
|  | @ -155,12 +165,7 @@ async def handle_order_requests( | |||
|                 action=order.action, | ||||
|                 size=order.size, | ||||
|                 account=acct_number, | ||||
| 
 | ||||
|                 # XXX: by default 0 tells ``ib_insync`` methods that | ||||
|                 # there is no existing order so ask the client to create | ||||
|                 # a new one (which it seems to do by allocating an int | ||||
|                 # counter - collision prone..) | ||||
|                 reqid=order.reqid, | ||||
|                 reqid=reqid, | ||||
|             ) | ||||
|             if reqid is None: | ||||
|                 await ems_order_stream.send(BrokerdError( | ||||
|  | @ -180,9 +185,9 @@ async def handle_order_requests( | |||
|                 ) | ||||
|             ) | ||||
| 
 | ||||
|         elif action == 'cancel': | ||||
|         if action == 'cancel': | ||||
|             msg = BrokerdCancel(**request_msg) | ||||
|             client.submit_cancel(reqid=msg.reqid) | ||||
|             client.submit_cancel(reqid=int(msg.reqid)) | ||||
| 
 | ||||
|         else: | ||||
|             log.error(f'Unknown order command: {request_msg}') | ||||
|  | @ -357,11 +362,24 @@ async def update_and_audit_msgs( | |||
|                 # presume we're at least not more in the shit then we | ||||
|                 # thought. | ||||
|                 if diff: | ||||
|                     reverse_split_ratio = pikersize / ibsize | ||||
|                     split_ratio = 1/reverse_split_ratio | ||||
| 
 | ||||
|                     if split_ratio >= reverse_split_ratio: | ||||
|                         entry = f'split_ratio = {int(split_ratio)}' | ||||
|                     else: | ||||
|                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' | ||||
| 
 | ||||
|                     raise ValueError( | ||||
|                         f'POSITION MISMATCH ib <-> piker ledger:\n' | ||||
|                         f'ib: {ibppmsg}\n' | ||||
|                         f'piker: {msg}\n' | ||||
|                         'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' | ||||
|                         f'reverse_split_ratio: {reverse_split_ratio}\n' | ||||
|                         f'split_ratio: {split_ratio}\n\n' | ||||
|                         'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n' | ||||
|                         'If you are expecting a (reverse) split in this ' | ||||
|                         'instrument you should probably put the following ' | ||||
|                         f'in the `pps.toml` section:\n{entry}' | ||||
|                     ) | ||||
|                     msg.size = ibsize | ||||
| 
 | ||||
|  | @ -439,7 +457,6 @@ async def trades_dialogue( | |||
|     # we might also want to delegate a specific actor for | ||||
|     # ledger writing / reading for speed? | ||||
|     async with ( | ||||
|         # trio.open_nursery() as nurse, | ||||
|         open_client_proxies() as (proxies, aioclients), | ||||
|     ): | ||||
|         # Open a trade ledgers stack for appending trade records over | ||||
|  | @ -468,6 +485,52 @@ async def trades_dialogue( | |||
| 
 | ||||
|                 client = aioclients[account] | ||||
| 
 | ||||
|                 trades: list[Trade] = client.ib.openTrades() | ||||
|                 order_msgs = [] | ||||
|                 for trade in trades: | ||||
| 
 | ||||
|                     order = trade.order | ||||
|                     quant = trade.order.totalQuantity | ||||
|                     action = order.action.lower() | ||||
|                     size = { | ||||
|                         'sell': -1, | ||||
|                         'buy': 1, | ||||
|                     }[action] * quant | ||||
|                     con = trade.contract | ||||
| 
 | ||||
|                     # TODO: in the case of the SMART venue (aka ib's | ||||
|                     # router-clearing sys) we probably should handle | ||||
|                     # showing such orders overtop of the fqsn for the | ||||
|                     # primary exchange, how to map this easily is going | ||||
|                     # to be a bit tricky though? | ||||
|                     deats = await proxy.con_deats(contracts=[con]) | ||||
|                     fqsn = list(deats)[0] | ||||
| 
 | ||||
|                     reqid = order.orderId | ||||
| 
 | ||||
|                     # TODO: maybe embed a ``BrokerdOrder`` instead | ||||
|                     # since then we can directly load it on the client | ||||
|                     # side in the order mode loop? | ||||
|                     msg = Status( | ||||
|                         time_ns=time.time_ns(), | ||||
|                         resp='open', | ||||
|                         oid=str(reqid), | ||||
|                         reqid=reqid, | ||||
| 
 | ||||
|                         # embedded order info | ||||
|                         req=Order( | ||||
|                             action=action, | ||||
|                             exec_mode='live', | ||||
|                             oid=str(reqid), | ||||
|                             symbol=fqsn, | ||||
|                             account=accounts_def.inverse[order.account], | ||||
|                             price=order.lmtPrice, | ||||
|                             size=size, | ||||
|                         ), | ||||
|                         src='ib', | ||||
|                     ) | ||||
|                     order_msgs.append(msg) | ||||
| 
 | ||||
|                 # process pp value reported from ib's system. we only use these | ||||
|                 # to cross-check sizing since average pricing on their end uses | ||||
|                 # the so called (bs) "FIFO" style which more or less results in | ||||
|  | @ -480,6 +543,7 @@ async def trades_dialogue( | |||
|                     # sure know which positions to update from the ledger if | ||||
|                     # any are missing from the ``pps.toml`` | ||||
|                     bsuid, msg = pack_position(pos) | ||||
| 
 | ||||
|                     acctid = msg.account = accounts_def.inverse[msg.account] | ||||
|                     acctid = acctid.strip('ib.') | ||||
|                     cids2pps[(acctid, bsuid)] = msg | ||||
|  | @ -493,9 +557,7 @@ async def trades_dialogue( | |||
|                         or pp.size != msg.size | ||||
|                     ): | ||||
|                         trans = norm_trade_records(ledger) | ||||
|                         updated = table.update_from_trans(trans) | ||||
|                         pp = updated[bsuid] | ||||
| 
 | ||||
|                         table.update_from_trans(trans) | ||||
|                         # update trades ledgers for all accounts from connected | ||||
|                         # api clients which report trades for **this session**. | ||||
|                         trades = await proxy.trades() | ||||
|  | @ -521,9 +583,28 @@ async def trades_dialogue( | |||
|                             trans = trans_by_acct.get(acctid) | ||||
|                             if trans: | ||||
|                                 table.update_from_trans(trans) | ||||
|                                 updated = table.update_from_trans(trans) | ||||
| 
 | ||||
|                         assert msg.size == pp.size, 'WTF' | ||||
|                         # XXX: not sure exactly why it wouldn't be in | ||||
|                         # the updated output (maybe this is a bug?) but | ||||
|                         # if you create a pos from TWS and then load it | ||||
|                         # from the api trades it seems we get a key | ||||
|                         # error from ``update[bsuid]`` ? | ||||
|                         pp = table.pps.get(bsuid) | ||||
|                         if not pp: | ||||
|                             log.error( | ||||
|                                 f'The contract id for {msg} may have ' | ||||
|                                 f'changed to {bsuid}\nYou may need to ' | ||||
|                                 'adjust your ledger for this, skipping ' | ||||
|                                 'for now.' | ||||
|                             ) | ||||
|                             continue | ||||
| 
 | ||||
|                         if msg.size != pp.size: | ||||
|                             log.error( | ||||
|                                 'Position mismatch {pp.symbol.front_fqsn()}:\n' | ||||
|                                 f'ib: {msg.size}\n' | ||||
|                                 f'piker: {pp.size}\n' | ||||
|                             ) | ||||
| 
 | ||||
|                 active_pps, closed_pps = table.dump_active() | ||||
| 
 | ||||
|  | @ -575,6 +656,10 @@ async def trades_dialogue( | |||
|                 ctx.open_stream() as ems_stream, | ||||
|                 trio.open_nursery() as n, | ||||
|             ): | ||||
|                 # relay existing open orders to ems | ||||
|                 for msg in order_msgs: | ||||
|                     await ems_stream.send(msg) | ||||
| 
 | ||||
|                 trade_event_stream = await n.start(open_trade_event_stream) | ||||
|                 clients.append((client, trade_event_stream)) | ||||
| 
 | ||||
|  | @ -586,6 +671,7 @@ async def trades_dialogue( | |||
|                 for client, stream in clients: | ||||
|                     n.start_soon( | ||||
|                         deliver_trade_events, | ||||
|                         n, | ||||
|                         stream, | ||||
|                         ems_stream, | ||||
|                         accounts_def, | ||||
|  | @ -661,8 +747,24 @@ async def emit_pp_update( | |||
|     await ems_stream.send(msg) | ||||
| 
 | ||||
| 
 | ||||
| _statuses: dict[str, str] = { | ||||
|     'cancelled': 'canceled', | ||||
|     'submitted': 'open', | ||||
|     # XXX: just pass these through? it duplicates actual fill events other | ||||
|     # then the case where you the `.remaining == 0` case which is our | ||||
|     # 'closed'` case. | ||||
|     # 'filled': 'pending', | ||||
|     # 'pendingsubmit': 'pending', | ||||
| 
 | ||||
|     # TODO: see a current ``ib_insync`` issue around this: | ||||
|     # https://github.com/erdewit/ib_insync/issues/363 | ||||
|     'inactive': 'pending', | ||||
| } | ||||
| 
 | ||||
| 
 | ||||
| async def deliver_trade_events( | ||||
| 
 | ||||
|     nurse: trio.Nursery, | ||||
|     trade_event_stream: trio.MemoryReceiveChannel, | ||||
|     ems_stream: tractor.MsgStream, | ||||
|     accounts_def: dict[str, str],  # eg. `'ib.main'` -> `'DU999999'` | ||||
|  | @ -718,6 +820,45 @@ async def deliver_trade_events( | |||
|                 # unwrap needed data from ib_insync internal types | ||||
|                 trade: Trade = item | ||||
|                 status: OrderStatus = trade.orderStatus | ||||
|                 ib_status_key = status.status.lower() | ||||
| 
 | ||||
|                 acctid = accounts_def.inverse[trade.order.account] | ||||
| 
 | ||||
|                 # double check there is no error when | ||||
|                 # cancelling.. gawwwd | ||||
|                 if ib_status_key == 'cancelled': | ||||
|                     last_log = trade.log[-1] | ||||
|                     if ( | ||||
|                         last_log.message | ||||
|                         and 'Error' not in last_log.message | ||||
|                     ): | ||||
|                         ib_status_key = trade.log[-2].status | ||||
|                         print(ib_status_key) | ||||
| 
 | ||||
|                 elif ib_status_key == 'inactive': | ||||
|                     async def sched_cancel(): | ||||
|                         log.warning( | ||||
|                             'OH GAWD an inactive order..scheduling a cancel\n' | ||||
|                             f'{pformat(item)}' | ||||
|                         ) | ||||
|                         proxy = proxies[acctid] | ||||
|                         await proxy.submit_cancel(reqid=trade.order.orderId) | ||||
|                         await trio.sleep(1) | ||||
|                         nurse.start_soon(sched_cancel) | ||||
| 
 | ||||
|                     nurse.start_soon(sched_cancel) | ||||
| 
 | ||||
|                 status_key = ( | ||||
|                     _statuses.get(ib_status_key) | ||||
|                     or ib_status_key.lower() | ||||
|                 ) | ||||
| 
 | ||||
|                 remaining = status.remaining | ||||
|                 if ( | ||||
|                     status_key == 'filled' | ||||
|                     and remaining == 0 | ||||
|                 ): | ||||
|                     status_key = 'closed' | ||||
| 
 | ||||
|                 # skip duplicate filled updates - we get the deats | ||||
|                 # from the execution details event | ||||
|  | @ -728,14 +869,14 @@ async def deliver_trade_events( | |||
|                     account=accounts_def.inverse[trade.order.account], | ||||
| 
 | ||||
|                     # everyone doin camel case.. | ||||
|                     status=status.status.lower(),  # force lower case | ||||
|                     status=status_key,  # force lower case | ||||
| 
 | ||||
|                     filled=status.filled, | ||||
|                     reason=status.whyHeld, | ||||
| 
 | ||||
|                     # this seems to not be necessarily up to date in the | ||||
|                     # execDetails event.. so we have to send it here I guess? | ||||
|                     remaining=status.remaining, | ||||
|                     remaining=remaining, | ||||
| 
 | ||||
|                     broker_details={'name': 'ib'}, | ||||
|                 ) | ||||
|  | @ -870,17 +1011,25 @@ async def deliver_trade_events( | |||
|                 if err['reqid'] == -1: | ||||
|                     log.error(f'TWS external order error:\n{pformat(err)}') | ||||
| 
 | ||||
|                 # TODO: what schema for this msg if we're going to make it | ||||
|                 # portable across all backends? | ||||
|                 # msg = BrokerdError(**err) | ||||
|                 # TODO: we don't want to relay data feed / lookup errors | ||||
|                 # so we need some further filtering logic here.. | ||||
|                 # for most cases the 'status' block above should take | ||||
|                 # care of this. | ||||
|                 # await ems_stream.send(BrokerdStatus( | ||||
|                 #     status='error', | ||||
|                 #     reqid=err['reqid'], | ||||
|                 #     reason=err['reason'], | ||||
|                 #     time_ns=time.time_ns(), | ||||
|                 #     account=accounts_def.inverse[trade.order.account], | ||||
|                 #     broker_details={'name': 'ib'}, | ||||
|                 # )) | ||||
| 
 | ||||
|             case 'position': | ||||
| 
 | ||||
|                 cid, msg = pack_position(item) | ||||
|                 log.info(f'New IB position msg: {msg}') | ||||
|                 # acctid = msg.account = accounts_def.inverse[msg.account] | ||||
|                 # cuck ib and it's shitty fifo sys for pps! | ||||
|                 # await ems_stream.send(msg) | ||||
|                 continue | ||||
| 
 | ||||
|             case 'event': | ||||
| 
 | ||||
|  |  | |||
|  | @ -101,3 +101,30 @@ def percent_change( | |||
|     new: float, | ||||
| ) -> float: | ||||
|     return pnl(init, new) * 100. | ||||
| 
 | ||||
| 
 | ||||
| def diff_dict( | ||||
|     d1: dict, | ||||
|     d2: dict, | ||||
| 
 | ||||
| ) -> dict: | ||||
|     d1_keys = set(d1.keys()) | ||||
|     d2_keys = set(d2.keys()) | ||||
|     shared_keys = d1_keys.intersection(d2_keys) | ||||
|     shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]} | ||||
|     added_keys = d2_keys - d1_keys | ||||
|     added_deltas = {o: (None, d2[o]) for o in added_keys} | ||||
|     deltas = {**shared_deltas, **added_deltas} | ||||
|     return parse_deltas(deltas) | ||||
| 
 | ||||
| 
 | ||||
| def parse_deltas(deltas: dict) -> dict: | ||||
|     res = {} | ||||
|     for k, v in deltas.items(): | ||||
|         if isinstance(v[0], dict): | ||||
|             tmp = diff_dict(v[0], v[1]) | ||||
|             if tmp: | ||||
|                 res[k] = tmp | ||||
|         else: | ||||
|             res[k] = v[1] | ||||
|     return res | ||||
|  |  | |||
|  | @ -83,7 +83,13 @@ class OrderBook: | |||
|         """Cancel an order (or alert) in the EMS. | ||||
| 
 | ||||
|         """ | ||||
|         cmd = self._sent_orders[uuid] | ||||
|         cmd = self._sent_orders.get(uuid) | ||||
|         if not cmd: | ||||
|             log.error( | ||||
|                 f'Unknown order {uuid}!?\n' | ||||
|                 f'Maybe there is a stale entry or line?\n' | ||||
|                 f'You should report this as a bug!' | ||||
|             ) | ||||
|         msg = Cancel( | ||||
|             oid=uuid, | ||||
|             symbol=cmd.symbol, | ||||
|  | @ -149,10 +155,17 @@ async def relay_order_cmds_from_sync_code( | |||
|     book = get_orders() | ||||
|     async with book._from_order_book.subscribe() as orders_stream: | ||||
|         async for cmd in orders_stream: | ||||
|             if cmd.symbol == symbol_key: | ||||
|                 log.info(f'Send order cmd:\n{pformat(cmd)}') | ||||
|             sym = cmd.symbol | ||||
|             msg = pformat(cmd) | ||||
|             if sym == symbol_key: | ||||
|                 log.info(f'Send order cmd:\n{msg}') | ||||
|                 # send msg over IPC / wire | ||||
|                 await to_ems_stream.send(cmd) | ||||
|             else: | ||||
|                 log.warning( | ||||
|                     f'Ignoring unmatched order cmd for {sym} != {symbol_key}:' | ||||
|                     f'\n{msg}' | ||||
|                 ) | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  | @ -220,11 +233,19 @@ async def open_ems( | |||
|                 fqsn=fqsn, | ||||
|                 exec_mode=mode, | ||||
| 
 | ||||
|             ) as (ctx, (positions, accounts)), | ||||
|             ) as ( | ||||
|                 ctx, | ||||
|                 ( | ||||
|                     positions, | ||||
|                     accounts, | ||||
|                     dialogs, | ||||
|                 ) | ||||
|             ), | ||||
| 
 | ||||
|             # open 2-way trade command stream | ||||
|             ctx.open_stream() as trades_stream, | ||||
|         ): | ||||
|             # start sync code order msg delivery task | ||||
|             async with trio.open_nursery() as n: | ||||
|                 n.start_soon( | ||||
|                     relay_order_cmds_from_sync_code, | ||||
|  | @ -232,4 +253,10 @@ async def open_ems( | |||
|                     trades_stream | ||||
|                 ) | ||||
| 
 | ||||
|                 yield book, trades_stream, positions, accounts | ||||
|                 yield ( | ||||
|                     book, | ||||
|                     trades_stream, | ||||
|                     positions, | ||||
|                     accounts, | ||||
|                     dialogs, | ||||
|                 ) | ||||
|  |  | |||
|  | @ -18,8 +18,8 @@ | |||
| In da suit parlances: "Execution management systems" | ||||
| 
 | ||||
| """ | ||||
| from collections import defaultdict, ChainMap | ||||
| from contextlib import asynccontextmanager | ||||
| from dataclasses import dataclass, field | ||||
| from math import isnan | ||||
| from pprint import pformat | ||||
| import time | ||||
|  | @ -27,6 +27,7 @@ from typing import ( | |||
|     AsyncIterator, | ||||
|     Any, | ||||
|     Callable, | ||||
|     Optional, | ||||
| ) | ||||
| 
 | ||||
| from bidict import bidict | ||||
|  | @ -41,9 +42,16 @@ from ..data.types import Struct | |||
| from .._daemon import maybe_spawn_brokerd | ||||
| from . import _paper_engine as paper | ||||
| from ._messages import ( | ||||
|     Status, Order, | ||||
|     BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, | ||||
|     BrokerdFill, BrokerdError, BrokerdPosition, | ||||
|     Order, | ||||
|     Status, | ||||
|     # Cancel, | ||||
|     BrokerdCancel, | ||||
|     BrokerdOrder, | ||||
|     # BrokerdOrderAck, | ||||
|     BrokerdStatus, | ||||
|     BrokerdFill, | ||||
|     BrokerdError, | ||||
|     BrokerdPosition, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -90,8 +98,7 @@ def mk_check( | |||
|     ) | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class _DarkBook: | ||||
| class _DarkBook(Struct): | ||||
|     ''' | ||||
|     EMS-trigger execution book. | ||||
| 
 | ||||
|  | @ -116,17 +123,24 @@ class _DarkBook: | |||
|                 dict,  # cmd / msg type | ||||
|             ] | ||||
|         ] | ||||
|     ] = field(default_factory=dict) | ||||
|     ] = {} | ||||
| 
 | ||||
|     # tracks most recent values per symbol each from data feed | ||||
|     lasts: dict[ | ||||
|         str, | ||||
|         float, | ||||
|     ] = field(default_factory=dict) | ||||
|     ] = {} | ||||
| 
 | ||||
|     # mapping of piker ems order ids to current brokerd order flow message | ||||
|     _ems_entries: dict[str, str] = field(default_factory=dict) | ||||
|     _ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) | ||||
|     # _ems_entries: dict[str, str] = {} | ||||
|     _active: dict = {} | ||||
| 
 | ||||
|     # mapping of ems dialog ids to msg flow history | ||||
|     _msgflows: defaultdict[ | ||||
|         int, | ||||
|         ChainMap[dict[str, dict]], | ||||
|     ] = defaultdict(ChainMap) | ||||
| 
 | ||||
|     _ems2brokerd_ids: dict[str, str] = bidict() | ||||
| 
 | ||||
| 
 | ||||
| # XXX: this is in place to prevent accidental positions that are too | ||||
|  | @ -181,6 +195,7 @@ async def clear_dark_triggers( | |||
|                 for oid, ( | ||||
|                     pred, | ||||
|                     tf, | ||||
|                     # TODO: send this msg instead? | ||||
|                     cmd, | ||||
|                     percent_away, | ||||
|                     abs_diff_away | ||||
|  | @ -188,9 +203,9 @@ async def clear_dark_triggers( | |||
|                     tuple(execs.items()) | ||||
|                 ): | ||||
|                     if ( | ||||
|                         not pred or | ||||
|                         ttype not in tf or | ||||
|                         not pred(price) | ||||
|                         not pred | ||||
|                         or ttype not in tf | ||||
|                         or not pred(price) | ||||
|                     ): | ||||
|                         # log.runtime( | ||||
|                         #     f'skipping quote for {sym} ' | ||||
|  | @ -200,30 +215,29 @@ async def clear_dark_triggers( | |||
|                         # majority of iterations will be non-matches | ||||
|                         continue | ||||
| 
 | ||||
|                     brokerd_msg: Optional[BrokerdOrder] = None | ||||
|                     match cmd: | ||||
|                         # alert: nothing to do but relay a status | ||||
|                         # back to the requesting ems client | ||||
|                         case { | ||||
|                             'action': 'alert', | ||||
|                         }: | ||||
|                             resp = 'alert_triggered' | ||||
|                         case Order(action='alert'): | ||||
|                             resp = 'triggered' | ||||
| 
 | ||||
|                         # executable order submission | ||||
|                         case { | ||||
|                             'action': action, | ||||
|                             'symbol': symbol, | ||||
|                             'account': account, | ||||
|                             'size': size, | ||||
|                         }: | ||||
|                         case Order( | ||||
|                             action=action, | ||||
|                             symbol=symbol, | ||||
|                             account=account, | ||||
|                             size=size, | ||||
|                         ): | ||||
|                             bfqsn: str = symbol.replace(f'.{broker}', '') | ||||
|                             submit_price = price + abs_diff_away | ||||
|                             resp = 'dark_triggered'  # hidden on client-side | ||||
|                             resp = 'triggered'  # hidden on client-side | ||||
| 
 | ||||
|                             log.info( | ||||
|                                 f'Dark order triggered for price {price}\n' | ||||
|                                 f'Submitting order @ price {submit_price}') | ||||
| 
 | ||||
|                             live_req = BrokerdOrder( | ||||
|                             brokerd_msg = BrokerdOrder( | ||||
|                                 action=action, | ||||
|                                 oid=oid, | ||||
|                                 account=account, | ||||
|  | @ -232,7 +246,8 @@ async def clear_dark_triggers( | |||
|                                 price=submit_price, | ||||
|                                 size=size, | ||||
|                             ) | ||||
|                             await brokerd_orders_stream.send(live_req) | ||||
| 
 | ||||
|                             await brokerd_orders_stream.send(brokerd_msg) | ||||
| 
 | ||||
|                             # mark this entry as having sent an order | ||||
|                             # request.  the entry will be replaced once the | ||||
|  | @ -240,18 +255,19 @@ async def clear_dark_triggers( | |||
|                             # a ``BrokerdOrderAck`` msg including the | ||||
|                             # allocated unique ``BrokerdOrderAck.reqid`` key | ||||
|                             # generated by the broker's own systems. | ||||
|                             book._ems_entries[oid] = live_req | ||||
|                             # book._ems_entries[oid] = live_req | ||||
|                             # book._msgflows[oid].maps.insert(0, live_req) | ||||
| 
 | ||||
|                         case _: | ||||
|                             raise ValueError(f'Invalid dark book entry: {cmd}') | ||||
| 
 | ||||
|                     # fallthrough logic | ||||
|                     resp = Status( | ||||
|                     status = Status( | ||||
|                         oid=oid,  # ems dialog id | ||||
|                         time_ns=time.time_ns(), | ||||
|                         resp=resp, | ||||
|                         trigger_price=price, | ||||
|                         brokerd_msg=cmd, | ||||
|                         req=cmd, | ||||
|                         brokerd_msg=brokerd_msg, | ||||
|                     ) | ||||
| 
 | ||||
|                     # remove exec-condition from set | ||||
|  | @ -262,9 +278,18 @@ async def clear_dark_triggers( | |||
|                             f'pred for {oid} was already removed!?' | ||||
|                         ) | ||||
| 
 | ||||
|                     # update actives | ||||
|                     if cmd.action == 'alert': | ||||
|                         # don't register the alert status (so it won't | ||||
|                         # be reloaded by clients) since it's now | ||||
|                         # complete / closed. | ||||
|                         book._active.pop(oid) | ||||
|                     else: | ||||
|                         book._active[oid] = status | ||||
| 
 | ||||
|                     # send response to client-side | ||||
|                     try: | ||||
|                         await ems_client_order_stream.send(resp) | ||||
|                         await ems_client_order_stream.send(status) | ||||
|                     except ( | ||||
|                         trio.ClosedResourceError, | ||||
|                     ): | ||||
|  | @ -281,8 +306,7 @@ async def clear_dark_triggers( | |||
|         # print(f'execs scan took: {time.time() - start}') | ||||
| 
 | ||||
| 
 | ||||
| @dataclass | ||||
| class TradesRelay: | ||||
| class TradesRelay(Struct): | ||||
| 
 | ||||
|     # for now we keep only a single connection open with | ||||
|     # each ``brokerd`` for simplicity. | ||||
|  | @ -318,7 +342,10 @@ class Router(Struct): | |||
| 
 | ||||
|     # order id to client stream map | ||||
|     clients: set[tractor.MsgStream] = set() | ||||
|     dialogues: dict[str, list[tractor.MsgStream]] = {} | ||||
|     dialogues: dict[ | ||||
|         str, | ||||
|         list[tractor.MsgStream] | ||||
|     ] = {} | ||||
| 
 | ||||
|     # brokername to trades-dialogues streams with ``brokerd`` actors | ||||
|     relays: dict[str, TradesRelay] = {} | ||||
|  | @ -341,11 +368,12 @@ class Router(Struct): | |||
|         loglevel: str, | ||||
| 
 | ||||
|     ) -> tuple[dict, tractor.MsgStream]: | ||||
|         '''Open and yield ``brokerd`` trades dialogue context-stream if none | ||||
|         already exists. | ||||
|         ''' | ||||
|         Open and yield ``brokerd`` trades dialogue context-stream if | ||||
|         none already exists. | ||||
| 
 | ||||
|         ''' | ||||
|         relay = self.relays.get(feed.mod.name) | ||||
|         relay: TradesRelay = self.relays.get(feed.mod.name) | ||||
| 
 | ||||
|         if ( | ||||
|             relay is None | ||||
|  | @ -381,6 +409,22 @@ class Router(Struct): | |||
| 
 | ||||
|             relay.consumers -= 1 | ||||
| 
 | ||||
|     async def client_broadcast( | ||||
|         self, | ||||
|         msg: dict, | ||||
| 
 | ||||
|     ) -> None: | ||||
|         for client_stream in self.clients.copy(): | ||||
|             try: | ||||
|                 await client_stream.send(msg) | ||||
|             except( | ||||
|                 trio.ClosedResourceError, | ||||
|                 trio.BrokenResourceError, | ||||
|             ): | ||||
|                 self.clients.remove(client_stream) | ||||
|                 log.warning( | ||||
|                     f'client for {client_stream} was already closed?') | ||||
| 
 | ||||
| 
 | ||||
| _router: Router = None | ||||
| 
 | ||||
|  | @ -452,7 +496,6 @@ async def open_brokerd_trades_dialogue( | |||
|             async with ( | ||||
|                 open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), | ||||
|                 brokerd_ctx.open_stream() as brokerd_trades_stream, | ||||
| 
 | ||||
|             ): | ||||
|                 # XXX: really we only want one stream per `emsd` actor | ||||
|                 # to relay global `brokerd` order events unless we're | ||||
|  | @ -502,14 +545,9 @@ async def open_brokerd_trades_dialogue( | |||
| 
 | ||||
|                 task_status.started(relay) | ||||
| 
 | ||||
|                 await translate_and_relay_brokerd_events( | ||||
|                     broker, | ||||
|                     brokerd_trades_stream, | ||||
|                     _router, | ||||
|                 ) | ||||
| 
 | ||||
|                 # this context should block here indefinitely until | ||||
|                 # the ``brokerd`` task either dies or is cancelled | ||||
|                 await trio.sleep_forever() | ||||
| 
 | ||||
|         finally: | ||||
|             # parent context must have been closed | ||||
|  | @ -561,15 +599,14 @@ async def translate_and_relay_brokerd_events( | |||
| 
 | ||||
|         broker       ems | ||||
|         'error'  ->  log it locally (for now) | ||||
|         'status' ->  relabel as 'broker_<status>', if complete send 'executed' | ||||
|         'fill'   ->  'broker_filled' | ||||
|         ('status' | 'fill'} ->  relayed through see ``Status`` msg type. | ||||
| 
 | ||||
|     Currently handled status values from IB: | ||||
|         {'presubmitted', 'submitted', 'cancelled', 'inactive'} | ||||
| 
 | ||||
|     ''' | ||||
|     book = router.get_dark_book(broker) | ||||
|     relay = router.relays[broker] | ||||
|     book: _DarkBook = router.get_dark_book(broker) | ||||
|     relay: TradesRelay = router.relays[broker] | ||||
| 
 | ||||
|     assert relay.brokerd_dialogue == brokerd_trades_stream | ||||
| 
 | ||||
|  | @ -601,30 +638,16 @@ async def translate_and_relay_brokerd_events( | |||
| 
 | ||||
|                 # fan-out-relay position msgs immediately by | ||||
|                 # broadcasting updates on all client streams | ||||
|                 for client_stream in router.clients.copy(): | ||||
|                     try: | ||||
|                         await client_stream.send(pos_msg) | ||||
|                     except( | ||||
|                         trio.ClosedResourceError, | ||||
|                         trio.BrokenResourceError, | ||||
|                     ): | ||||
|                         router.clients.remove(client_stream) | ||||
|                         log.warning( | ||||
|                             f'client for {client_stream} was already closed?') | ||||
| 
 | ||||
|                 await router.client_broadcast(pos_msg) | ||||
|                 continue | ||||
| 
 | ||||
|             # BrokerdOrderAck | ||||
|             # initial response to brokerd order request | ||||
|             case { | ||||
|                 'name': 'ack', | ||||
|                 'reqid': reqid,  # brokerd generated order-request id | ||||
|                 'oid': oid,  # ems order-dialog id | ||||
|             } if ( | ||||
|                 entry := book._ems_entries.get(oid) | ||||
|             ): | ||||
|                 # initial response to brokerd order request | ||||
|                 # if name == 'ack': | ||||
| 
 | ||||
|             }: | ||||
|                 # register the brokerd request id (that was generated | ||||
|                 # / created internally by the broker backend) with our | ||||
|                 # local ems order id for reverse lookup later. | ||||
|  | @ -639,23 +662,24 @@ async def translate_and_relay_brokerd_events( | |||
|                 # new order which has not yet be registered into the | ||||
|                 # local ems book, insert it now and handle 2 cases: | ||||
| 
 | ||||
|                 # - the order has previously been requested to be | ||||
|                 # 1. the order has previously been requested to be | ||||
|                 # cancelled by the ems controlling client before we | ||||
|                 # received this ack, in which case we relay that cancel | ||||
|                 # signal **asap** to the backend broker | ||||
|                 action = getattr(entry, 'action', None) | ||||
|                 if action and action == 'cancel': | ||||
|                 # status = book._active.get(oid) | ||||
|                 status_msg = book._active[oid] | ||||
|                 req = status_msg.req | ||||
|                 if req and req.action == 'cancel': | ||||
|                     # assign newly providerd broker backend request id | ||||
|                     entry.reqid = reqid | ||||
|                     # and tell broker to cancel immediately | ||||
|                     status_msg.reqid = reqid | ||||
|                     await brokerd_trades_stream.send(req) | ||||
| 
 | ||||
|                     # tell broker to cancel immediately | ||||
|                     await brokerd_trades_stream.send(entry) | ||||
| 
 | ||||
|                 # - the order is now active and will be mirrored in | ||||
|                 # 2. the order is now active and will be mirrored in | ||||
|                 # our book -> registered as live flow | ||||
|                 else: | ||||
|                     # update the flow with the ack msg | ||||
|                     book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) | ||||
|                     # TODO: should we relay this ack state? | ||||
|                     status_msg.resp = 'pending' | ||||
| 
 | ||||
|                 # no msg to client necessary | ||||
|                 continue | ||||
|  | @ -666,11 +690,9 @@ async def translate_and_relay_brokerd_events( | |||
|                 'oid': oid,  # ems order-dialog id | ||||
|                 'reqid': reqid,  # brokerd generated order-request id | ||||
|                 'symbol': sym, | ||||
|                 'broker_details': details, | ||||
|                 # 'reason': reason, | ||||
|             }: | ||||
|             } if status_msg := book._active.get(oid): | ||||
| 
 | ||||
|                 msg = BrokerdError(**brokerd_msg) | ||||
|                 resp = 'broker_errored' | ||||
|                 log.error(pformat(msg))  # XXX make one when it's blank? | ||||
| 
 | ||||
|                 # TODO: figure out how this will interact with EMS clients | ||||
|  | @ -680,43 +702,64 @@ async def translate_and_relay_brokerd_events( | |||
|                 # some unexpected failure - something we need to think more | ||||
|                 # about.  In most default situations, with composed orders | ||||
|                 # (ex.  brackets), most brokers seem to use a oca policy. | ||||
|                 ems_client_order_stream = router.dialogues[oid] | ||||
|                 status_msg.resp = 'error' | ||||
|                 status_msg.brokerd_msg = msg | ||||
|                 book._active[oid] = status_msg | ||||
|                 await ems_client_order_stream.send(status_msg) | ||||
| 
 | ||||
|             # BrokerdStatus | ||||
|             case { | ||||
|                 'name': 'status', | ||||
|                 'status': status, | ||||
|                 'reqid': reqid,  # brokerd generated order-request id | ||||
|                 # TODO: feels like the wrong msg for this field? | ||||
|                 'remaining': remaining, | ||||
| 
 | ||||
|             } if ( | ||||
|                 oid := book._ems2brokerd_ids.inverse.get(reqid) | ||||
|                 (oid := book._ems2brokerd_ids.inverse.get(reqid)) | ||||
|                 and status in ( | ||||
|                     'canceled', | ||||
|                     'open', | ||||
|                     'closed', | ||||
|                 ) | ||||
|             ): | ||||
|                 msg = BrokerdStatus(**brokerd_msg) | ||||
| 
 | ||||
|                 # TODO: should we flatten out these cases and/or should | ||||
|                 # they maybe even eventually be separate messages? | ||||
|                 if status == 'cancelled': | ||||
|                     log.info(f'Cancellation for {oid} is complete!') | ||||
|                 # TODO: maybe pack this into a composite type that | ||||
|                 # contains both the IPC stream as well the | ||||
|                 # msg-chain/dialog. | ||||
|                 ems_client_order_stream = router.dialogues[oid] | ||||
|                 status_msg = book._active[oid] | ||||
|                 old_resp = status_msg.resp | ||||
|                 status_msg.resp = status | ||||
| 
 | ||||
|                 if status == 'filled': | ||||
|                     # conditional execution is fully complete, no more | ||||
|                     # fills for the noted order | ||||
|                     if not remaining: | ||||
|                 # retrieve existing live flow | ||||
|                 old_reqid = status_msg.reqid | ||||
|                 if old_reqid and old_reqid != reqid: | ||||
|                     log.warning( | ||||
|                         f'Brokerd order id change for {oid}:\n' | ||||
|                         f'{old_reqid} -> {reqid}' | ||||
|                     ) | ||||
| 
 | ||||
|                         resp = 'broker_executed' | ||||
|                 status_msg.reqid = reqid  # THIS LINE IS CRITICAL! | ||||
|                 status_msg.brokerd_msg = msg | ||||
|                 status_msg.src = msg.broker_details['name'] | ||||
|                 await ems_client_order_stream.send(status_msg) | ||||
| 
 | ||||
|                         # be sure to pop this stream from our dialogue set | ||||
|                         # since the order dialogue should be done. | ||||
|                         log.info(f'Execution for {oid} is complete!') | ||||
|                 if status == 'closed': | ||||
|                     log.info(f'Execution for {oid} is complete!') | ||||
| 
 | ||||
|                     # only if we already rxed a fill then probably | ||||
|                     # this clear is fully complete? (frickin ib..) | ||||
|                     if old_resp == 'fill': | ||||
|                         status_msg = book._active.pop(oid) | ||||
| 
 | ||||
|                 elif status == 'canceled': | ||||
|                     log.cancel(f'Cancellation for {oid} is complete!') | ||||
| 
 | ||||
|                 else:  # open | ||||
|                     # relayed from backend but probably not handled so | ||||
|                     # just log it | ||||
|                     else: | ||||
|                         log.info(f'{broker} filled {msg}') | ||||
| 
 | ||||
|                 else: | ||||
|                     # one of {submitted, cancelled} | ||||
|                     resp = 'broker_' + msg.status | ||||
|                     log.info(f'{broker} opened order {msg}') | ||||
| 
 | ||||
|             # BrokerdFill | ||||
|             case { | ||||
|  | @ -728,82 +771,111 @@ async def translate_and_relay_brokerd_events( | |||
|             ): | ||||
|                 # proxy through the "fill" result(s) | ||||
|                 msg = BrokerdFill(**brokerd_msg) | ||||
|                 resp = 'broker_filled' | ||||
|                 log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') | ||||
|                 log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') | ||||
| 
 | ||||
|             # unknown valid message case? | ||||
|             # case { | ||||
|             #     'name': name, | ||||
|             #     'symbol': sym, | ||||
|             #     'reqid': reqid,  # brokerd generated order-request id | ||||
|             #     # 'oid': oid,  # ems order-dialog id | ||||
|             #     'broker_details': details, | ||||
|                 ems_client_order_stream = router.dialogues[oid] | ||||
| 
 | ||||
|             # } if ( | ||||
|             #     book._ems2brokerd_ids.inverse.get(reqid) is None | ||||
|             # ): | ||||
|             #     # TODO: pretty sure we can drop this now? | ||||
|                 # wtf a fill can come after 'closed' from ib? | ||||
|                 status_msg = book._active[oid] | ||||
| 
 | ||||
|             #     # XXX: paper clearing special cases | ||||
|             #     # paper engine race case: ``Client.submit_limit()`` hasn't | ||||
|             #     # returned yet and provided an output reqid to register | ||||
|             #     # locally, so we need to retreive the oid that was already | ||||
|             #     # packed at submission since we already know it ahead of | ||||
|             #     # time | ||||
|             #     paper = details.get('paper_info') | ||||
|             #     ext = details.get('external') | ||||
|                 # only if we already rxed a 'closed' | ||||
|                 # this clear is fully complete? (frickin ib..) | ||||
|                 # if status_msg.resp == 'closed': | ||||
|                 #     status_msg = book._active.pop(oid) | ||||
| 
 | ||||
|             #     if paper: | ||||
|             #         # paperboi keeps the ems id up front | ||||
|             #         oid = paper['oid'] | ||||
|                 status_msg.resp = 'fill' | ||||
|                 status_msg.reqid = reqid | ||||
|                 status_msg.brokerd_msg = msg | ||||
|                 await ems_client_order_stream.send(status_msg) | ||||
| 
 | ||||
|             #     elif ext: | ||||
|             #         # may be an order msg specified as "external" to the | ||||
|             #         # piker ems flow (i.e. generated by some other | ||||
|             #         # external broker backend client (like tws for ib) | ||||
|             #         log.error(f"External trade event {name}@{ext}") | ||||
|             # ``Status`` containing an embedded order msg which | ||||
|             # should be loaded as a "pre-existing open order" from the | ||||
|             # brokerd backend. | ||||
|             case { | ||||
|                 'name': 'status', | ||||
|                 'resp': status, | ||||
|                 'reqid': reqid,  # brokerd generated order-request id | ||||
|             }: | ||||
|                 if ( | ||||
|                     status != 'open' | ||||
|                 ): | ||||
|                     # TODO: check for an oid we might know since it was | ||||
|                     # registered from a previous order/status load? | ||||
|                     log.error( | ||||
|                         f'Unknown/transient status msg:\n' | ||||
|                         f'{pformat(brokerd_msg)}\n' | ||||
|                         'Unable to relay message to client side!?' | ||||
|                     ) | ||||
| 
 | ||||
|             #     else: | ||||
|             #         # something is out of order, we don't have an oid for | ||||
|             #         # this broker-side message. | ||||
|             #         log.error( | ||||
|             #             f'Unknown oid: {oid} for msg {name}:\n' | ||||
|             #             f'{pformat(brokerd_msg)}\n' | ||||
|             #             'Unable to relay message to client side!?' | ||||
|             #         ) | ||||
|                 # TODO: we probably want some kind of "tagging" system | ||||
|                 # for external order submissions like this eventually | ||||
|                 # to be able to more formally handle multi-player | ||||
|                 # trading... | ||||
|                 else: | ||||
|                     # existing open backend order which we broadcast to | ||||
|                     # all currently connected clients. | ||||
|                     log.info( | ||||
|                         f'Relaying existing open order:\n {brokerd_msg}' | ||||
|                     ) | ||||
| 
 | ||||
|             #     continue | ||||
|                     # use backend request id as our ems id though this | ||||
|                     # may end up with collisions? | ||||
|                     status_msg = Status(**brokerd_msg) | ||||
|                     order = Order(**status_msg.req) | ||||
|                     assert order.price and order.size | ||||
|                     status_msg.req = order | ||||
| 
 | ||||
|                     assert status_msg.src  # source tag? | ||||
|                     oid = str(status_msg.reqid) | ||||
| 
 | ||||
|                     # attempt to avoid collisions | ||||
|                     status_msg.reqid = oid | ||||
|                     assert status_msg.resp == 'open' | ||||
| 
 | ||||
|                     # register this existing broker-side dialog | ||||
|                     book._ems2brokerd_ids[oid] = reqid | ||||
|                     book._active[oid] = status_msg | ||||
| 
 | ||||
|                     # fan-out-relay position msgs immediately by | ||||
|                     # broadcasting updates on all client streams | ||||
|                     await router.client_broadcast(status_msg) | ||||
| 
 | ||||
|                 # don't fall through | ||||
|                 continue | ||||
| 
 | ||||
|             # brokerd error | ||||
|             case { | ||||
|                 'name': 'status', | ||||
|                 'status': 'error', | ||||
|             }: | ||||
|                 log.error(f'Broker error:\n{pformat(brokerd_msg)}') | ||||
|                 # XXX: we presume the brokerd cancels its own order | ||||
| 
 | ||||
|             # TOO FAST ``BrokerdStatus`` that arrives | ||||
|             # before the ``BrokerdAck``. | ||||
|             case { | ||||
|                 # XXX: sometimes there is a race with the backend (like | ||||
|                 # `ib` where the pending stauts will be related before | ||||
|                 # the ack, in which case we just ignore the faster | ||||
|                 # pending msg and wait for our expected ack to arrive | ||||
|                 # later (i.e. the first block below should enter). | ||||
|                 'name': 'status', | ||||
|                 'status': status, | ||||
|                 'reqid': reqid, | ||||
|             }: | ||||
|                 status_msg = book._active[oid] | ||||
|                 log.warning( | ||||
|                     'Unhandled broker status for dialog:\n' | ||||
|                     f'{pformat(status_msg)}\n' | ||||
|                     f'{pformat(brokerd_msg)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             case _: | ||||
|                 raise ValueError(f'Brokerd message {brokerd_msg} is invalid') | ||||
| 
 | ||||
|         # retrieve existing live flow | ||||
|         entry = book._ems_entries[oid] | ||||
|         assert entry.oid == oid | ||||
| 
 | ||||
|         old_reqid = entry.reqid | ||||
|         if old_reqid and old_reqid != reqid: | ||||
|             log.warning( | ||||
|                 f'Brokerd order id change for {oid}:\n' | ||||
|                 f'{old_reqid} -> {reqid}' | ||||
|             ) | ||||
| 
 | ||||
|         # Create and relay response status message | ||||
|         # to requesting EMS client | ||||
|         try: | ||||
|             ems_client_order_stream = router.dialogues[oid] | ||||
|             await ems_client_order_stream.send( | ||||
|                 Status( | ||||
|                     oid=oid, | ||||
|                     resp=resp, | ||||
|                     time_ns=time.time_ns(), | ||||
|                     broker_reqid=reqid, | ||||
|                     brokerd_msg=msg, | ||||
|                 ) | ||||
|             ) | ||||
|         except KeyError: | ||||
|             log.error( | ||||
|                 f'Received `brokerd` msg for unknown client with oid: {oid}') | ||||
|         # XXX: ugh sometimes we don't access it? | ||||
|         if status_msg: | ||||
|             del status_msg | ||||
| 
 | ||||
|     # TODO: do we want this to keep things cleaned up? | ||||
|     # it might require a special status from brokerd to affirm the | ||||
|  | @ -829,27 +901,36 @@ async def process_client_order_cmds( | |||
|     async for cmd in client_order_stream: | ||||
|         log.info(f'Received order cmd:\n{pformat(cmd)}') | ||||
| 
 | ||||
|         oid = cmd['oid'] | ||||
|         # CAWT DAMN we need struct support! | ||||
|         oid = str(cmd['oid']) | ||||
| 
 | ||||
|         # register this stream as an active dialogue for this order id | ||||
|         # such that translated message from the brokerd backend can be | ||||
|         # routed (relayed) to **just** that client stream (and in theory | ||||
|         # others who are registered for such order affiliated msgs). | ||||
|         client_dialogues[oid] = client_order_stream | ||||
|         reqid = dark_book._ems2brokerd_ids.inverse.get(oid) | ||||
|         live_entry = dark_book._ems_entries.get(oid) | ||||
| 
 | ||||
|         # any dark/live status which is current | ||||
|         status = dark_book._active.get(oid) | ||||
| 
 | ||||
|         match cmd: | ||||
|             # existing live-broker order cancel | ||||
|             case { | ||||
|                 'action': 'cancel', | ||||
|                 'oid': oid, | ||||
|             } if live_entry: | ||||
|                 reqid = live_entry.reqid | ||||
|                 msg = BrokerdCancel( | ||||
|             } if ( | ||||
|                 (status := dark_book._active.get(oid)) | ||||
|                 and status.resp in ('open', 'pending') | ||||
|             ): | ||||
|                 reqid = status.reqid | ||||
|                 order = status.req | ||||
|                 to_brokerd_msg = BrokerdCancel( | ||||
|                     oid=oid, | ||||
|                     reqid=reqid, | ||||
|                     time_ns=time.time_ns(), | ||||
|                     account=live_entry.account, | ||||
|                     # account=live_entry.account, | ||||
|                     account=order.account, | ||||
|                 ) | ||||
| 
 | ||||
|                 # NOTE: cancel response will be relayed back in messages | ||||
|  | @ -859,39 +940,53 @@ async def process_client_order_cmds( | |||
|                     log.info( | ||||
|                         f'Submitting cancel for live order {reqid}' | ||||
|                     ) | ||||
|                     await brokerd_order_stream.send(msg) | ||||
|                     await brokerd_order_stream.send(to_brokerd_msg) | ||||
| 
 | ||||
|                 else: | ||||
|                     # this might be a cancel for an order that hasn't been | ||||
|                     # acked yet by a brokerd, so register a cancel for when | ||||
|                     # the order ack does show up later such that the brokerd | ||||
|                     # order request can be cancelled at that time. | ||||
|                     dark_book._ems_entries[oid] = msg | ||||
|                     # dark_book._ems_entries[oid] = msg | ||||
|                     # special case for now.. | ||||
|                     status.req = to_brokerd_msg | ||||
| 
 | ||||
|             # dark trigger cancel | ||||
|             case { | ||||
|                 'action': 'cancel', | ||||
|                 'oid': oid, | ||||
|             } if not live_entry: | ||||
|                 try: | ||||
|                     # remove from dark book clearing | ||||
|                     dark_book.orders[symbol].pop(oid, None) | ||||
|             } if ( | ||||
|                 status and status.resp == 'dark_open' | ||||
|                 # or status and status.req | ||||
|             ): | ||||
|                 # remove from dark book clearing | ||||
|                 entry = dark_book.orders[symbol].pop(oid, None) | ||||
|                 if entry: | ||||
|                     ( | ||||
|                         pred, | ||||
|                         tickfilter, | ||||
|                         cmd, | ||||
|                         percent_away, | ||||
|                         abs_diff_away | ||||
|                     ) = entry | ||||
| 
 | ||||
|                     # tell client side that we've cancelled the | ||||
|                     # dark-trigger order | ||||
|                     await client_order_stream.send( | ||||
|                         Status( | ||||
|                             resp='dark_cancelled', | ||||
|                             oid=oid, | ||||
|                             time_ns=time.time_ns(), | ||||
|                         ) | ||||
|                     ) | ||||
|                     status.resp = 'canceled' | ||||
|                     status.req = cmd | ||||
| 
 | ||||
|                     await client_order_stream.send(status) | ||||
|                     # de-register this client dialogue | ||||
|                     router.dialogues.pop(oid) | ||||
|                     dark_book._active.pop(oid) | ||||
| 
 | ||||
|                 except KeyError: | ||||
|                 else: | ||||
|                     log.exception(f'No dark order for {symbol}?') | ||||
| 
 | ||||
|             # TODO: eventually we should be receiving | ||||
|             # this struct on the wire unpacked in a scoped protocol | ||||
|             # setup with ``tractor``. | ||||
| 
 | ||||
|             # live order submission | ||||
|             case { | ||||
|                 'oid': oid, | ||||
|  | @ -899,11 +994,9 @@ async def process_client_order_cmds( | |||
|                 'price': trigger_price, | ||||
|                 'size': size, | ||||
|                 'action': ('buy' | 'sell') as action, | ||||
|                 'exec_mode': 'live', | ||||
|                 'exec_mode': ('live' | 'paper'), | ||||
|             }: | ||||
|                 # TODO: eventually we should be receiving | ||||
|                 # this struct on the wire unpacked in a scoped protocol | ||||
|                 # setup with ``tractor``. | ||||
|                 # TODO: relay this order msg directly? | ||||
|                 req = Order(**cmd) | ||||
|                 broker = req.brokers[0] | ||||
| 
 | ||||
|  | @ -912,13 +1005,13 @@ async def process_client_order_cmds( | |||
|                 # aren't expectig their own name, but should they? | ||||
|                 sym = fqsn.replace(f'.{broker}', '') | ||||
| 
 | ||||
|                 if live_entry is not None: | ||||
|                     # sanity check on emsd id | ||||
|                     assert live_entry.oid == oid | ||||
|                     reqid = live_entry.reqid | ||||
|                 if status is not None: | ||||
|                     # if we already had a broker order id then | ||||
|                     # this is likely an order update commmand. | ||||
|                     log.info(f"Modifying live {broker} order: {reqid}") | ||||
|                     reqid = status.reqid | ||||
|                     status.req = req | ||||
|                     status.resp = 'pending' | ||||
| 
 | ||||
|                 msg = BrokerdOrder( | ||||
|                     oid=oid,  # no ib support for oids... | ||||
|  | @ -935,6 +1028,18 @@ async def process_client_order_cmds( | |||
|                     account=req.account, | ||||
|                 ) | ||||
| 
 | ||||
|                 if status is None: | ||||
|                     status = Status( | ||||
|                         oid=oid, | ||||
|                         reqid=reqid, | ||||
|                         resp='pending', | ||||
|                         time_ns=time.time_ns(), | ||||
|                         brokerd_msg=msg, | ||||
|                         req=req, | ||||
|                     ) | ||||
| 
 | ||||
|                 dark_book._active[oid] = status | ||||
| 
 | ||||
|                 # send request to backend | ||||
|                 # XXX: the trades data broker response loop | ||||
|                 # (``translate_and_relay_brokerd_events()`` above) will | ||||
|  | @ -950,7 +1055,7 @@ async def process_client_order_cmds( | |||
|                 # client, before that ack, when the ack does arrive we | ||||
|                 # immediately take the reqid from the broker and cancel | ||||
|                 # that live order asap. | ||||
|                 dark_book._ems_entries[oid] = msg | ||||
|                 # dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) | ||||
| 
 | ||||
|             # dark-order / alert submission | ||||
|             case { | ||||
|  | @ -966,9 +1071,11 @@ async def process_client_order_cmds( | |||
|                     # submit order to local EMS book and scan loop, | ||||
|                     # effectively a local clearing engine, which | ||||
|                     # scans for conditions and triggers matching executions | ||||
|                     exec_mode in ('dark', 'paper') | ||||
|                     exec_mode in ('dark',) | ||||
|                     or action == 'alert' | ||||
|             ): | ||||
|                 req = Order(**cmd) | ||||
| 
 | ||||
|                 # Auto-gen scanner predicate: | ||||
|                 # we automatically figure out what the alert check | ||||
|                 # condition should be based on the current first | ||||
|  | @ -1015,23 +1122,25 @@ async def process_client_order_cmds( | |||
|                 )[oid] = ( | ||||
|                     pred, | ||||
|                     tickfilter, | ||||
|                     cmd, | ||||
|                     req, | ||||
|                     percent_away, | ||||
|                     abs_diff_away | ||||
|                 ) | ||||
|                 resp = 'dark_submitted' | ||||
|                 resp = 'dark_open' | ||||
| 
 | ||||
|                 # alerts have special msgs to distinguish | ||||
|                 if action == 'alert': | ||||
|                     resp = 'alert_submitted' | ||||
|                 # if action == 'alert': | ||||
|                 #     resp = 'open' | ||||
| 
 | ||||
|                 await client_order_stream.send( | ||||
|                     Status( | ||||
|                         resp=resp, | ||||
|                         oid=oid, | ||||
|                         time_ns=time.time_ns(), | ||||
|                     ) | ||||
|                 status = Status( | ||||
|                     resp=resp, | ||||
|                     oid=oid, | ||||
|                     time_ns=time.time_ns(), | ||||
|                     req=req, | ||||
|                     src='dark', | ||||
|                 ) | ||||
|                 dark_book._active[oid] = status | ||||
|                 await client_order_stream.send(status) | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  | @ -1099,10 +1208,9 @@ async def _emsd_main( | |||
|     ): | ||||
| 
 | ||||
|         # XXX: this should be initial price quote from target provider | ||||
|         first_quote = feed.first_quotes[fqsn] | ||||
| 
 | ||||
|         book = _router.get_dark_book(broker) | ||||
|         book.lasts[fqsn] = first_quote['last'] | ||||
|         first_quote: dict = feed.first_quotes[fqsn] | ||||
|         book: _DarkBook = _router.get_dark_book(broker) | ||||
|         book.lasts[fqsn]: float = first_quote['last'] | ||||
| 
 | ||||
|         # open a stream with the brokerd backend for order | ||||
|         # flow dialogue | ||||
|  | @ -1129,12 +1237,25 @@ async def _emsd_main( | |||
|             await ems_ctx.started(( | ||||
|                 relay.positions, | ||||
|                 list(relay.accounts), | ||||
|                 book._active, | ||||
|             )) | ||||
| 
 | ||||
|             # establish 2-way stream with requesting order-client and | ||||
|             # begin handling inbound order requests and updates | ||||
|             async with ems_ctx.open_stream() as ems_client_order_stream: | ||||
| 
 | ||||
|                 # register the client side before startingn the | ||||
|                 # brokerd-side relay task to ensure the client is | ||||
|                 # delivered all exisiting open orders on startup. | ||||
|                 _router.clients.add(ems_client_order_stream) | ||||
| 
 | ||||
|                 n.start_soon( | ||||
|                     translate_and_relay_brokerd_events, | ||||
|                     broker, | ||||
|                     brokerd_stream, | ||||
|                     _router, | ||||
|                 ) | ||||
| 
 | ||||
|                 # trigger scan and exec loop | ||||
|                 n.start_soon( | ||||
|                     clear_dark_triggers, | ||||
|  | @ -1149,7 +1270,6 @@ async def _emsd_main( | |||
| 
 | ||||
|                 # start inbound (from attached client) order request processing | ||||
|                 try: | ||||
|                     _router.clients.add(ems_client_order_stream) | ||||
| 
 | ||||
|                     # main entrypoint, run here until cancelled. | ||||
|                     await process_client_order_cmds( | ||||
|  |  | |||
|  | @ -18,24 +18,92 @@ | |||
| Clearing sub-system message and protocols. | ||||
| 
 | ||||
| """ | ||||
| from typing import Optional, Union | ||||
| # from collections import ( | ||||
| #     ChainMap, | ||||
| #     deque, | ||||
| # ) | ||||
| from typing import ( | ||||
|     Optional, | ||||
|     Literal, | ||||
| ) | ||||
| 
 | ||||
| from ..data._source import Symbol | ||||
| from ..data.types import Struct | ||||
| 
 | ||||
| 
 | ||||
| # TODO: a composite for tracking msg flow on 2-legged | ||||
| # dialogs. | ||||
| # class Dialog(ChainMap): | ||||
| #     ''' | ||||
| #     Msg collection abstraction to easily track the state changes of | ||||
| #     a msg flow in one high level, query-able and immutable construct. | ||||
| 
 | ||||
| #     The main use case is to query data from a (long-running) | ||||
| #     msg-transaction-sequence | ||||
| 
 | ||||
| 
 | ||||
| #     ''' | ||||
| #     def update( | ||||
| #         self, | ||||
| #         msg, | ||||
| #     ) -> None: | ||||
| #         self.maps.insert(0, msg.to_dict()) | ||||
| 
 | ||||
| #     def flatten(self) -> dict: | ||||
| #         return dict(self) | ||||
| 
 | ||||
| 
 | ||||
| # TODO: ``msgspec`` stuff worth paying attention to: | ||||
| # - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution | ||||
| # - schema evolution: | ||||
| # https://jcristharif.com/msgspec/usage.html#schema-evolution | ||||
| # - for eg. ``BrokerdStatus``, instead just have separate messages? | ||||
| # - use literals for a common msg determined by diff keys? | ||||
| #   - https://jcristharif.com/msgspec/usage.html#literal | ||||
| #   - for eg. ``BrokerdStatus``, instead just have separate messages? | ||||
| 
 | ||||
| # -------------- | ||||
| # Client -> emsd | ||||
| # -------------- | ||||
| 
 | ||||
| class Order(Struct): | ||||
| 
 | ||||
|     # TODO: ideally we can combine these 2 fields into | ||||
|     # 1 and just use the size polarity to determine a buy/sell. | ||||
|     # i would like to see this become more like | ||||
|     # https://jcristharif.com/msgspec/usage.html#literal | ||||
|     # action: Literal[ | ||||
|     #     'live', | ||||
|     #     'dark', | ||||
|     #     'alert', | ||||
|     # ] | ||||
| 
 | ||||
|     action: Literal[ | ||||
|         'buy', | ||||
|         'sell', | ||||
|         'alert', | ||||
|     ] | ||||
|     # determines whether the create execution | ||||
|     # will be submitted to the ems or directly to | ||||
|     # the backend broker | ||||
|     exec_mode: Literal[ | ||||
|         'dark', | ||||
|         'live', | ||||
|         # 'paper',  no right? | ||||
|     ] | ||||
| 
 | ||||
|     # internal ``emdsd`` unique "order id" | ||||
|     oid: str  # uuid4 | ||||
|     symbol: str | Symbol | ||||
|     account: str  # should we set a default as '' ? | ||||
| 
 | ||||
|     price: float | ||||
|     size: float  # -ve is "sell", +ve is "buy" | ||||
| 
 | ||||
|     brokers: Optional[list[str]] = [] | ||||
| 
 | ||||
| 
 | ||||
| class Cancel(Struct): | ||||
|     '''Cancel msg for removing a dark (ems triggered) or | ||||
|     ''' | ||||
|     Cancel msg for removing a dark (ems triggered) or | ||||
|     broker-submitted (live) trigger/order. | ||||
| 
 | ||||
|     ''' | ||||
|  | @ -44,32 +112,6 @@ class Cancel(Struct): | |||
|     symbol: str | ||||
| 
 | ||||
| 
 | ||||
| class Order(Struct): | ||||
| 
 | ||||
|     # TODO: use ``msgspec.Literal`` | ||||
|     # https://jcristharif.com/msgspec/usage.html#literal | ||||
|     action: str  # {'buy', 'sell', 'alert'} | ||||
|     # internal ``emdsd`` unique "order id" | ||||
|     oid: str  # uuid4 | ||||
|     symbol: Union[str, Symbol] | ||||
|     account: str  # should we set a default as '' ? | ||||
| 
 | ||||
|     price: float | ||||
|     # TODO: could we drop the ``.action`` field above and instead just | ||||
|     # use +/- values here? Would make the msg smaller at the sake of a | ||||
|     # teensie fp precision? | ||||
|     size: float | ||||
|     brokers: list[str] | ||||
| 
 | ||||
|     # Assigned once initial ack is received | ||||
|     # ack_time_ns: Optional[int] = None | ||||
| 
 | ||||
|     # determines whether the create execution | ||||
|     # will be submitted to the ems or directly to | ||||
|     # the backend broker | ||||
|     exec_mode: str  # {'dark', 'live', 'paper'} | ||||
| 
 | ||||
| 
 | ||||
| # -------------- | ||||
| # Client <- emsd | ||||
| # -------------- | ||||
|  | @ -79,37 +121,39 @@ class Order(Struct): | |||
| class Status(Struct): | ||||
| 
 | ||||
|     name: str = 'status' | ||||
|     oid: str  # uuid4 | ||||
|     time_ns: int | ||||
|     oid: str  # uuid4 ems-order dialog id | ||||
| 
 | ||||
|     # { | ||||
|     #   'dark_submitted', | ||||
|     #   'dark_cancelled', | ||||
|     #   'dark_triggered', | ||||
| 
 | ||||
|     #   'broker_submitted', | ||||
|     #   'broker_cancelled', | ||||
|     #   'broker_executed', | ||||
|     #   'broker_filled', | ||||
|     #   'broker_errored', | ||||
| 
 | ||||
|     #   'alert_submitted', | ||||
|     #   'alert_triggered', | ||||
| 
 | ||||
|     # } | ||||
|     resp: str  # "response", see above | ||||
| 
 | ||||
|     # trigger info | ||||
|     trigger_price: Optional[float] = None | ||||
|     # price: float | ||||
| 
 | ||||
|     # broker: Optional[str] = None | ||||
|     resp: Literal[ | ||||
|       'pending',  # acked by broker but not yet open | ||||
|       'open', | ||||
|       'dark_open',  # dark/algo triggered order is open in ems clearing loop | ||||
|       'triggered',  # above triggered order sent to brokerd, or an alert closed | ||||
|       'closed',  # fully cleared all size/units | ||||
|       'fill',  # partial execution | ||||
|       'canceled', | ||||
|       'error', | ||||
|     ] | ||||
| 
 | ||||
|     # this maps normally to the ``BrokerdOrder.reqid`` below, an id | ||||
|     # normally allocated internally by the backend broker routing system | ||||
|     broker_reqid: Optional[Union[int, str]] = None | ||||
|     reqid: Optional[int | str] = None | ||||
| 
 | ||||
|     # for relaying backend msg data "through" the ems layer | ||||
|     # the (last) source order/request msg if provided | ||||
|     # (eg. the Order/Cancel which causes this msg) and | ||||
|     # acts as a back-reference to the corresponding | ||||
|     # request message which was the source of this msg. | ||||
|     req: Optional[Order | Cancel] = None | ||||
| 
 | ||||
|     # XXX: better design/name here? | ||||
|     # flag that can be set to indicate a message for an order | ||||
|     # event that wasn't originated by piker's emsd (eg. some external | ||||
|     # trading system which does it's own order control but that you | ||||
|     # might want to "track" using piker UIs/systems). | ||||
|     src: Optional[str] = None | ||||
| 
 | ||||
|     # for relaying a boxed brokerd-dialog-side msg data "through" the | ||||
|     # ems layer to clients. | ||||
|     brokerd_msg: dict = {} | ||||
| 
 | ||||
| 
 | ||||
|  | @ -131,25 +175,28 @@ class BrokerdCancel(Struct): | |||
|     # for setting a unique order id then this value will be relayed back | ||||
|     # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` | ||||
|     # field | ||||
|     reqid: Optional[Union[int, str]] = None | ||||
|     reqid: Optional[int | str] = None | ||||
| 
 | ||||
| 
 | ||||
| class BrokerdOrder(Struct): | ||||
| 
 | ||||
|     action: str  # {buy, sell} | ||||
|     oid: str | ||||
|     account: str | ||||
|     time_ns: int | ||||
| 
 | ||||
|     # TODO: if we instead rely on a +ve/-ve size to determine | ||||
|     # the action we more or less don't need this field right? | ||||
|     action: str = ''  # {buy, sell} | ||||
| 
 | ||||
|     # "broker request id": broker specific/internal order id if this is | ||||
|     # None, creates a new order otherwise if the id is valid the backend | ||||
|     # api must modify the existing matching order. If the broker allows | ||||
|     # for setting a unique order id then this value will be relayed back | ||||
|     # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` | ||||
|     # field | ||||
|     reqid: Optional[Union[int, str]] = None | ||||
|     reqid: Optional[int | str] = None | ||||
| 
 | ||||
|     symbol: str  # symbol.<providername> ? | ||||
|     symbol: str  # fqsn | ||||
|     price: float | ||||
|     size: float | ||||
| 
 | ||||
|  | @ -170,7 +217,7 @@ class BrokerdOrderAck(Struct): | |||
|     name: str = 'ack' | ||||
| 
 | ||||
|     # defined and provided by backend | ||||
|     reqid: Union[int, str] | ||||
|     reqid: int | str | ||||
| 
 | ||||
|     # emsd id originally sent in matching request msg | ||||
|     oid: str | ||||
|  | @ -180,30 +227,22 @@ class BrokerdOrderAck(Struct): | |||
| class BrokerdStatus(Struct): | ||||
| 
 | ||||
|     name: str = 'status' | ||||
|     reqid: Union[int, str] | ||||
|     reqid: int | str | ||||
|     time_ns: int | ||||
|     status: Literal[ | ||||
|         'open', | ||||
|         'canceled', | ||||
|         'fill', | ||||
|         'pending', | ||||
|         'error', | ||||
|     ] | ||||
| 
 | ||||
|     # XXX: should be best effort set for every update | ||||
|     account: str = '' | ||||
| 
 | ||||
|     # TODO: instead (ack, pending, open, fill, clos(ed), cancelled) | ||||
|     # { | ||||
|     #   'submitted', | ||||
|     #   'cancelled', | ||||
|     #   'filled', | ||||
|     # } | ||||
|     status: str | ||||
| 
 | ||||
|     account: str | ||||
|     filled: float = 0.0 | ||||
|     reason: str = '' | ||||
|     remaining: float = 0.0 | ||||
| 
 | ||||
|     # XXX: better design/name here? | ||||
|     # flag that can be set to indicate a message for an order | ||||
|     # event that wasn't originated by piker's emsd (eg. some external | ||||
|     # trading system which does it's own order control but that you | ||||
|     # might want to "track" using piker UIs/systems). | ||||
|     external: bool = False | ||||
|     # external: bool = False | ||||
| 
 | ||||
|     # XXX: not required schema as of yet | ||||
|     broker_details: dict = { | ||||
|  | @ -218,7 +257,7 @@ class BrokerdFill(Struct): | |||
| 
 | ||||
|     ''' | ||||
|     name: str = 'fill' | ||||
|     reqid: Union[int, str] | ||||
|     reqid: int | str | ||||
|     time_ns: int | ||||
| 
 | ||||
|     # order exeuction related | ||||
|  | @ -248,7 +287,7 @@ class BrokerdError(Struct): | |||
| 
 | ||||
|     # if no brokerd order request was actually submitted (eg. we errored | ||||
|     # at the ``pikerd`` layer) then there will be ``reqid`` allocated. | ||||
|     reqid: Optional[Union[int, str]] = None | ||||
|     reqid: Optional[int | str] = None | ||||
| 
 | ||||
|     symbol: str | ||||
|     reason: str | ||||
|  |  | |||
|  | @ -45,8 +45,13 @@ from ..data._normalize import iterticks | |||
| from ..data._source import unpack_fqsn | ||||
| from ..log import get_logger | ||||
| from ._messages import ( | ||||
|     BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, | ||||
|     BrokerdFill, BrokerdPosition, BrokerdError | ||||
|     BrokerdCancel, | ||||
|     BrokerdOrder, | ||||
|     BrokerdOrderAck, | ||||
|     BrokerdStatus, | ||||
|     BrokerdFill, | ||||
|     BrokerdPosition, | ||||
|     BrokerdError, | ||||
| ) | ||||
| 
 | ||||
| 
 | ||||
|  | @ -94,6 +99,10 @@ class PaperBoi: | |||
|         ''' | ||||
|         is_modify: bool = False | ||||
| 
 | ||||
|         if action == 'alert': | ||||
|             # bypass all fill simulation | ||||
|             return reqid | ||||
| 
 | ||||
|         entry = self._reqids.get(reqid) | ||||
|         if entry: | ||||
|             # order is already existing, this is a modify | ||||
|  | @ -104,10 +113,6 @@ class PaperBoi: | |||
|             # register order internally | ||||
|             self._reqids[reqid] = (oid, symbol, action, price) | ||||
| 
 | ||||
|         if action == 'alert': | ||||
|             # bypass all fill simulation | ||||
|             return reqid | ||||
| 
 | ||||
|         # TODO: net latency model | ||||
|         # we checkpoint here quickly particulalry | ||||
|         # for dark orders since we want the dark_executed | ||||
|  | @ -119,7 +124,9 @@ class PaperBoi: | |||
|             size = -size | ||||
| 
 | ||||
|         msg = BrokerdStatus( | ||||
|             status='submitted', | ||||
|             status='open', | ||||
|             # account=f'paper_{self.broker}', | ||||
|             account='paper', | ||||
|             reqid=reqid, | ||||
|             time_ns=time.time_ns(), | ||||
|             filled=0.0, | ||||
|  | @ -136,7 +143,14 @@ class PaperBoi: | |||
|             ) or ( | ||||
|             action == 'sell' and (clear_price := self.last_bid[0]) >= price | ||||
|         ): | ||||
|             await self.fake_fill(symbol, clear_price, size, action, reqid, oid) | ||||
|             await self.fake_fill( | ||||
|                 symbol, | ||||
|                 clear_price, | ||||
|                 size, | ||||
|                 action, | ||||
|                 reqid, | ||||
|                 oid, | ||||
|             ) | ||||
| 
 | ||||
|         else: | ||||
|             # register this submissions as a paper live order | ||||
|  | @ -178,7 +192,9 @@ class PaperBoi: | |||
|         await trio.sleep(0.05) | ||||
| 
 | ||||
|         msg = BrokerdStatus( | ||||
|             status='cancelled', | ||||
|             status='canceled', | ||||
|             # account=f'paper_{self.broker}', | ||||
|             account='paper', | ||||
|             reqid=reqid, | ||||
|             time_ns=time.time_ns(), | ||||
|             broker_details={'name': 'paperboi'}, | ||||
|  | @ -230,25 +246,23 @@ class PaperBoi: | |||
|         self._trade_ledger.update(fill_msg.to_dict()) | ||||
| 
 | ||||
|         if order_complete: | ||||
| 
 | ||||
|             msg = BrokerdStatus( | ||||
| 
 | ||||
|                 reqid=reqid, | ||||
|                 time_ns=time.time_ns(), | ||||
| 
 | ||||
|                 status='filled', | ||||
|                 # account=f'paper_{self.broker}', | ||||
|                 account='paper', | ||||
|                 status='closed', | ||||
|                 filled=size, | ||||
|                 remaining=0 if order_complete else remaining, | ||||
| 
 | ||||
|                 broker_details={ | ||||
|                     'paper_info': { | ||||
|                         'oid': oid, | ||||
|                     }, | ||||
|                     'action': action, | ||||
|                     'size': size, | ||||
|                     'price': price, | ||||
|                     'name': self.broker, | ||||
|                 }, | ||||
|                 # broker_details={ | ||||
|                 #     'paper_info': { | ||||
|                 #         'oid': oid, | ||||
|                 #     }, | ||||
|                 #     'action': action, | ||||
|                 #     'size': size, | ||||
|                 #     'price': price, | ||||
|                 #     'name': self.broker, | ||||
|                 # }, | ||||
|             ) | ||||
|             await self.ems_trades_stream.send(msg) | ||||
| 
 | ||||
|  | @ -393,69 +407,72 @@ async def handle_order_requests( | |||
|     # order_request: dict | ||||
|     async for request_msg in ems_order_stream: | ||||
| 
 | ||||
|         action = request_msg['action'] | ||||
| 
 | ||||
|         if action in {'buy', 'sell'}: | ||||
| 
 | ||||
|             account = request_msg['account'] | ||||
|             if account != 'paper': | ||||
|                 log.error( | ||||
|                     'This is a paper account,' | ||||
|                     ' only a `paper` selection is valid' | ||||
|                 ) | ||||
|                 await ems_order_stream.send(BrokerdError( | ||||
|                     oid=request_msg['oid'], | ||||
|                     symbol=request_msg['symbol'], | ||||
|                     reason=f'Paper only. No account found: `{account}` ?', | ||||
|                 )) | ||||
|                 continue | ||||
|         # action = request_msg['action'] | ||||
|         match request_msg: | ||||
|         # if action in {'buy', 'sell'}: | ||||
|             case {'action': ('buy' | 'sell')}: | ||||
|                 order = BrokerdOrder(**request_msg) | ||||
|                 account = order.account | ||||
|                 if account != 'paper': | ||||
|                     log.error( | ||||
|                         'This is a paper account,' | ||||
|                         ' only a `paper` selection is valid' | ||||
|                     ) | ||||
|                     await ems_order_stream.send(BrokerdError( | ||||
|                         # oid=request_msg['oid'], | ||||
|                         oid=order.oid, | ||||
|                         # symbol=request_msg['symbol'], | ||||
|                         symbol=order.symbol, | ||||
|                         reason=f'Paper only. No account found: `{account}` ?', | ||||
|                     )) | ||||
|                     continue | ||||
| 
 | ||||
|             # validate | ||||
|             order = BrokerdOrder(**request_msg) | ||||
|             # order = BrokerdOrder(**request_msg) | ||||
| 
 | ||||
|             if order.reqid is None: | ||||
|                 reqid = str(uuid.uuid4()) | ||||
|             else: | ||||
|                 reqid = order.reqid | ||||
|                 # if order.reqid is None: | ||||
|                 #     reqid =  | ||||
|                 # else: | ||||
|                 reqid = order.reqid or str(uuid.uuid4()) | ||||
| 
 | ||||
|             # deliver ack that order has been submitted to broker routing | ||||
|             await ems_order_stream.send( | ||||
|                 BrokerdOrderAck( | ||||
|                 # deliver ack that order has been submitted to broker routing | ||||
|                 await ems_order_stream.send( | ||||
|                     BrokerdOrderAck( | ||||
| 
 | ||||
|                     # ems order request id | ||||
|                     oid=order.oid, | ||||
|                         # ems order request id | ||||
|                         oid=order.oid, | ||||
| 
 | ||||
|                     # broker specific request id | ||||
|                     reqid=reqid, | ||||
|                         # broker specific request id | ||||
|                         reqid=reqid, | ||||
| 
 | ||||
|                     ) | ||||
|                 ) | ||||
|             ) | ||||
| 
 | ||||
|             # call our client api to submit the order | ||||
|             reqid = await client.submit_limit( | ||||
|                 # call our client api to submit the order | ||||
|                 reqid = await client.submit_limit( | ||||
| 
 | ||||
|                 oid=order.oid, | ||||
|                 symbol=order.symbol, | ||||
|                 price=order.price, | ||||
|                 action=order.action, | ||||
|                 size=order.size, | ||||
|                     oid=order.oid, | ||||
|                     symbol=order.symbol, | ||||
|                     price=order.price, | ||||
|                     action=order.action, | ||||
|                     size=order.size, | ||||
| 
 | ||||
|                 # XXX: by default 0 tells ``ib_insync`` methods that | ||||
|                 # there is no existing order so ask the client to create | ||||
|                 # a new one (which it seems to do by allocating an int | ||||
|                 # counter - collision prone..) | ||||
|                 reqid=reqid, | ||||
|             ) | ||||
|                     # XXX: by default 0 tells ``ib_insync`` methods that | ||||
|                     # there is no existing order so ask the client to create | ||||
|                     # a new one (which it seems to do by allocating an int | ||||
|                     # counter - collision prone..) | ||||
|                     reqid=reqid, | ||||
|                 ) | ||||
| 
 | ||||
|         elif action == 'cancel': | ||||
|             msg = BrokerdCancel(**request_msg) | ||||
|             # elif action == 'cancel': | ||||
|             case {'action': 'cancel'}: | ||||
|                 msg = BrokerdCancel(**request_msg) | ||||
|                 await client.submit_cancel( | ||||
|                     reqid=msg.reqid | ||||
|                 ) | ||||
| 
 | ||||
|             await client.submit_cancel( | ||||
|                 reqid=msg.reqid | ||||
|             ) | ||||
| 
 | ||||
|         else: | ||||
|             log.error(f'Unknown order command: {request_msg}') | ||||
|             case _: | ||||
|                 log.error(f'Unknown order command: {request_msg}') | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
|  |  | |||
|  | @ -37,7 +37,7 @@ import time | |||
| from math import isnan | ||||
| 
 | ||||
| from bidict import bidict | ||||
| import msgpack | ||||
| from msgspec.msgpack import encode, decode | ||||
| import pyqtgraph as pg | ||||
| import numpy as np | ||||
| import tractor | ||||
|  | @ -774,12 +774,13 @@ async def stream_quotes( | |||
|     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: | ||||
|         # send subs topics to server | ||||
|         resp = await ws.send_message( | ||||
|             msgpack.dumps({'streams': list(tbks.values())}) | ||||
| 
 | ||||
|             encode({'streams': list(tbks.values())}) | ||||
|         ) | ||||
|         log.info(resp) | ||||
| 
 | ||||
|         async def recv() -> dict[str, Any]: | ||||
|             return msgpack.loads((await ws.get_message()), encoding='utf-8') | ||||
|             return decode((await ws.get_message()), encoding='utf-8') | ||||
| 
 | ||||
|         streams = (await recv())['streams'] | ||||
|         log.info(f"Subscribed to {streams}") | ||||
|  |  | |||
|  | @ -18,6 +18,7 @@ | |||
| Built-in (extension) types. | ||||
| 
 | ||||
| """ | ||||
| import sys | ||||
| from typing import Optional | ||||
| from pprint import pformat | ||||
| 
 | ||||
|  | @ -42,7 +43,12 @@ class Struct( | |||
|         } | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|         return f'Struct({pformat(self.to_dict())})' | ||||
|         # only turn on pprint when we detect a python REPL | ||||
|         # at runtime B) | ||||
|         if hasattr(sys, 'ps1'): | ||||
|             return f'Struct({pformat(self.to_dict())})' | ||||
| 
 | ||||
|         return super().__repr__() | ||||
| 
 | ||||
|     def copy( | ||||
|         self, | ||||
|  |  | |||
							
								
								
									
										19
									
								
								piker/pp.py
								
								
								
								
							
							
						
						
									
										19
									
								
								piker/pp.py
								
								
								
								
							|  | @ -134,6 +134,8 @@ class Position(Struct): | |||
|     # unique backend symbol id | ||||
|     bsuid: str | ||||
| 
 | ||||
|     split_ratio: Optional[int] = None | ||||
| 
 | ||||
|     # ordered record of known constituent trade messages | ||||
|     clears: dict[ | ||||
|         Union[str, int, Status],  # trade id | ||||
|  | @ -159,6 +161,9 @@ class Position(Struct): | |||
|         clears = d.pop('clears') | ||||
|         expiry = d.pop('expiry') | ||||
| 
 | ||||
|         if self.split_ratio is None: | ||||
|             d.pop('split_ratio') | ||||
| 
 | ||||
|         # TODO: we need to figure out how to have one top level | ||||
|         # listing venue here even when the backend isn't providing | ||||
|         # it via the trades ledger.. | ||||
|  | @ -384,12 +389,22 @@ class Position(Struct): | |||
|                 asize_h.append(accum_size) | ||||
|                 ppu_h.append(ppu_h[-1]) | ||||
| 
 | ||||
|         return ppu_h[-1] if ppu_h else 0 | ||||
|         final_ppu = ppu_h[-1] if ppu_h else 0 | ||||
| 
 | ||||
|         # handle any split info entered (for now) manually by user | ||||
|         if self.split_ratio is not None: | ||||
|             final_ppu /= self.split_ratio | ||||
| 
 | ||||
|         return final_ppu | ||||
| 
 | ||||
|     def calc_size(self) -> float: | ||||
|         size: float = 0 | ||||
|         for tid, entry in self.clears.items(): | ||||
|             size += entry['size'] | ||||
| 
 | ||||
|         if self.split_ratio is not None: | ||||
|             size = round(size * self.split_ratio) | ||||
| 
 | ||||
|         return size | ||||
| 
 | ||||
|     def minimize_clears( | ||||
|  | @ -848,6 +863,7 @@ def open_pps( | |||
|         size = entry['size'] | ||||
|         # TODO: remove but, handle old field name for now | ||||
|         ppu = entry.get('ppu', entry.get('be_price', 0)) | ||||
|         split_ratio = entry.get('split_ratio') | ||||
| 
 | ||||
|         expiry = entry.get('expiry') | ||||
|         if expiry: | ||||
|  | @ -857,6 +873,7 @@ def open_pps( | |||
|             Symbol.from_fqsn(fqsn, info={}), | ||||
|             size=size, | ||||
|             ppu=ppu, | ||||
|             split_ratio=split_ratio, | ||||
|             expiry=expiry, | ||||
|             bsuid=entry['bsuid'], | ||||
| 
 | ||||
|  |  | |||
|  | @ -140,9 +140,9 @@ class LineEditor: | |||
| 
 | ||||
|     ) -> LevelLine: | ||||
| 
 | ||||
|         staged_line = self._active_staged_line | ||||
|         if not staged_line: | ||||
|             raise RuntimeError("No line is currently staged!?") | ||||
|         # staged_line = self._active_staged_line | ||||
|         # if not staged_line: | ||||
|         #     raise RuntimeError("No line is currently staged!?") | ||||
| 
 | ||||
|         # for now, until submission reponse arrives | ||||
|         line.hide_labels() | ||||
|  |  | |||
|  | @ -49,16 +49,21 @@ from ._position import ( | |||
|     SettingsPane, | ||||
| ) | ||||
| from ._forms import FieldsForm | ||||
| # from ._label import FormatLabel | ||||
| from ._window import MultiStatus | ||||
| from ..clearing._messages import Order, BrokerdPosition | ||||
| from ..clearing._messages import ( | ||||
|     Order, | ||||
|     Status, | ||||
|     # BrokerdOrder, | ||||
|     # BrokerdStatus, | ||||
|     BrokerdPosition, | ||||
| ) | ||||
| from ._forms import open_form_input_handling | ||||
| 
 | ||||
| 
 | ||||
| log = get_logger(__name__) | ||||
| 
 | ||||
| 
 | ||||
| class OrderDialog(Struct): | ||||
| class Dialog(Struct): | ||||
|     ''' | ||||
|     Trade dialogue meta-data describing the lifetime | ||||
|     of an order submission to ``emsd`` from a chart. | ||||
|  | @ -141,7 +146,7 @@ class OrderMode: | |||
|     current_pp: Optional[PositionTracker] = None | ||||
|     active: bool = False | ||||
|     name: str = 'order' | ||||
|     dialogs: dict[str, OrderDialog] = field(default_factory=dict) | ||||
|     dialogs: dict[str, Dialog] = field(default_factory=dict) | ||||
| 
 | ||||
|     _colors = { | ||||
|         'alert': 'alert_yellow', | ||||
|  | @ -152,10 +157,7 @@ class OrderMode: | |||
| 
 | ||||
|     def line_from_order( | ||||
|         self, | ||||
| 
 | ||||
|         order: Order, | ||||
|         symbol: Symbol, | ||||
| 
 | ||||
|         **line_kwargs, | ||||
| 
 | ||||
|     ) -> LevelLine: | ||||
|  | @ -173,8 +175,8 @@ class OrderMode: | |||
|             color=self._colors[order.action], | ||||
| 
 | ||||
|             dotted=True if ( | ||||
|                 order.exec_mode == 'dark' and | ||||
|                 order.action != 'alert' | ||||
|                 order.exec_mode == 'dark' | ||||
|                 and order.action != 'alert' | ||||
|             ) else False, | ||||
| 
 | ||||
|             **line_kwargs, | ||||
|  | @ -236,7 +238,6 @@ class OrderMode: | |||
| 
 | ||||
|         line = self.line_from_order( | ||||
|             order, | ||||
|             symbol, | ||||
| 
 | ||||
|             show_markers=True, | ||||
|             # just for the stage line to avoid | ||||
|  | @ -262,25 +263,28 @@ class OrderMode: | |||
| 
 | ||||
|     def submit_order( | ||||
|         self, | ||||
|         send_msg: bool = True, | ||||
|         order: Optional[Order] = None, | ||||
| 
 | ||||
|     ) -> OrderDialog: | ||||
|     ) -> Dialog: | ||||
|         ''' | ||||
|         Send execution order to EMS return a level line to | ||||
|         represent the order on a chart. | ||||
| 
 | ||||
|         ''' | ||||
|         staged = self._staged_order | ||||
|         symbol: Symbol = staged.symbol | ||||
|         oid = str(uuid.uuid4()) | ||||
|         if not order: | ||||
|             staged = self._staged_order | ||||
|             oid = str(uuid.uuid4()) | ||||
|             # symbol: Symbol = staged.symbol | ||||
| 
 | ||||
|         # format order data for ems | ||||
|         order = staged.copy() | ||||
|         order.oid = oid | ||||
|         order.symbol = symbol.front_fqsn() | ||||
|             # format order data for ems | ||||
|             order = staged.copy() | ||||
|             order.oid = oid | ||||
| 
 | ||||
|         order.symbol = order.symbol.front_fqsn() | ||||
| 
 | ||||
|         line = self.line_from_order( | ||||
|             order, | ||||
|             symbol, | ||||
| 
 | ||||
|             show_markers=True, | ||||
|             only_show_markers_on_hover=True, | ||||
|  | @ -298,17 +302,17 @@ class OrderMode: | |||
|         # color once the submission ack arrives. | ||||
|         self.lines.submit_line( | ||||
|             line=line, | ||||
|             uuid=oid, | ||||
|             uuid=order.oid, | ||||
|         ) | ||||
| 
 | ||||
|         dialog = OrderDialog( | ||||
|             uuid=oid, | ||||
|         dialog = Dialog( | ||||
|             uuid=order.oid, | ||||
|             order=order, | ||||
|             symbol=symbol, | ||||
|             symbol=order.symbol, | ||||
|             line=line, | ||||
|             last_status_close=self.multistatus.open_status( | ||||
|                 f'submitting {self._trigger_type}-{order.action}', | ||||
|                 final_msg=f'submitted {self._trigger_type}-{order.action}', | ||||
|                 f'submitting {order.exec_mode}-{order.action}', | ||||
|                 final_msg=f'submitted {order.exec_mode}-{order.action}', | ||||
|                 clear_on_next=True, | ||||
|             ) | ||||
|         ) | ||||
|  | @ -318,14 +322,21 @@ class OrderMode: | |||
| 
 | ||||
|         # enter submission which will be popped once a response | ||||
|         # from the EMS is received to move the order to a different# status | ||||
|         self.dialogs[oid] = dialog | ||||
|         self.dialogs[order.oid] = dialog | ||||
| 
 | ||||
|         # hook up mouse drag handlers | ||||
|         line._on_drag_start = self.order_line_modify_start | ||||
|         line._on_drag_end = self.order_line_modify_complete | ||||
| 
 | ||||
|         # send order cmd to ems | ||||
|         self.book.send(order) | ||||
|         if send_msg: | ||||
|             self.book.send(order) | ||||
|         else: | ||||
|             # just register for control over this order | ||||
|             # TODO: some kind of mini-perms system here based on | ||||
|             # an out-of-band tagging/auth sub-sys for multiplayer | ||||
|             # order control? | ||||
|             self.book._sent_orders[order.oid] = order | ||||
| 
 | ||||
|         return dialog | ||||
| 
 | ||||
|  | @ -363,7 +374,7 @@ class OrderMode: | |||
|         self, | ||||
|         uuid: str | ||||
| 
 | ||||
|     ) -> OrderDialog: | ||||
|     ) -> Dialog: | ||||
|         ''' | ||||
|         Order submitted status event handler. | ||||
| 
 | ||||
|  | @ -418,7 +429,7 @@ class OrderMode: | |||
|         self, | ||||
| 
 | ||||
|         uuid: str, | ||||
|         msg: Dict[str, Any], | ||||
|         msg: Status, | ||||
| 
 | ||||
|     ) -> None: | ||||
| 
 | ||||
|  | @ -442,7 +453,7 @@ class OrderMode: | |||
| 
 | ||||
|                 # TODO: add in standard fill/exec info that maybe we | ||||
|                 # pack in a broker independent way? | ||||
|                 f'{msg["resp"]}: {msg["trigger_price"]}', | ||||
|                 f'{msg.resp}: {msg.req.price}', | ||||
|             ], | ||||
|         ) | ||||
|         log.runtime(result) | ||||
|  | @ -502,7 +513,7 @@ class OrderMode: | |||
|                     oid = dialog.uuid | ||||
| 
 | ||||
|                     cancel_status_close = self.multistatus.open_status( | ||||
|                         f'cancelling order {oid[:6]}', | ||||
|                         f'cancelling order {oid}', | ||||
|                         group_key=key, | ||||
|                     ) | ||||
|                     dialog.last_status_close = cancel_status_close | ||||
|  | @ -512,6 +523,45 @@ class OrderMode: | |||
| 
 | ||||
|         return ids | ||||
| 
 | ||||
|     def load_unknown_dialog_from_msg( | ||||
|         self, | ||||
|         msg: Status, | ||||
| 
 | ||||
|     ) -> Dialog: | ||||
| 
 | ||||
|         # NOTE: the `.order` attr **must** be set with the | ||||
|         # equivalent order msg in order to be loaded. | ||||
|         order = Order(**msg.req) | ||||
|         oid = str(msg.oid) | ||||
|         symbol = order.symbol | ||||
| 
 | ||||
|         # TODO: MEGA UGGG ZONEEEE! | ||||
|         src = msg.src | ||||
|         if ( | ||||
|             src | ||||
|             and src != 'dark' | ||||
|             and src not in symbol | ||||
|         ): | ||||
|             fqsn = symbol + '.' + src | ||||
|             brokername = src | ||||
|         else: | ||||
|             fqsn = symbol | ||||
|             *head, brokername = fqsn.rsplit('.') | ||||
| 
 | ||||
|         # fill out complex fields | ||||
|         order.oid = str(order.oid) | ||||
|         order.brokers = [brokername] | ||||
|         order.symbol = Symbol.from_fqsn( | ||||
|             fqsn=fqsn, | ||||
|             info={}, | ||||
|         ) | ||||
|         dialog = self.submit_order( | ||||
|             send_msg=False, | ||||
|             order=order, | ||||
|         ) | ||||
|         assert self.dialogs[oid] == dialog | ||||
|         return dialog | ||||
| 
 | ||||
| 
 | ||||
| @asynccontextmanager | ||||
| async def open_order_mode( | ||||
|  | @ -549,6 +599,7 @@ async def open_order_mode( | |||
|             trades_stream, | ||||
|             position_msgs, | ||||
|             brokerd_accounts, | ||||
|             ems_dialog_msgs, | ||||
|         ), | ||||
|         trio.open_nursery() as tn, | ||||
| 
 | ||||
|  | @ -596,10 +647,10 @@ async def open_order_mode( | |||
| 
 | ||||
|                 sym = msg['symbol'] | ||||
|                 if ( | ||||
|                     sym == symkey or | ||||
|                     # mega-UGH, i think we need to fix the FQSN stuff sooner | ||||
|                     # then later.. | ||||
|                     sym == symkey.removesuffix(f'.{broker}') | ||||
|                     (sym == symkey) or ( | ||||
|                         # mega-UGH, i think we need to fix the FQSN | ||||
|                         # stuff sooner then later.. | ||||
|                         sym == symkey.removesuffix(f'.{broker}')) | ||||
|                 ): | ||||
|                     pps_by_account[acctid] = msg | ||||
| 
 | ||||
|  | @ -653,7 +704,7 @@ async def open_order_mode( | |||
|         # setup order mode sidepane widgets | ||||
|         form: FieldsForm = chart.sidepane | ||||
|         form.vbox.setSpacing( | ||||
|             int((1 + 5/8)*_font.px_size) | ||||
|             int((1 + 5 / 8) * _font.px_size) | ||||
|         ) | ||||
| 
 | ||||
|         from ._feedstatus import mk_feed_label | ||||
|  | @ -703,7 +754,7 @@ async def open_order_mode( | |||
|         order_pane.order_mode = mode | ||||
| 
 | ||||
|         # select a pp to track | ||||
|         tracker = trackers[pp_account] | ||||
|         tracker: PositionTracker = trackers[pp_account] | ||||
|         mode.current_pp = tracker | ||||
|         tracker.show() | ||||
|         tracker.hide_info() | ||||
|  | @ -755,151 +806,181 @@ async def open_order_mode( | |||
|             # to handle input since the ems connection is ready | ||||
|             started.set() | ||||
| 
 | ||||
|             for oid, msg in ems_dialog_msgs.items(): | ||||
| 
 | ||||
|                 # HACK ALERT: ensure a resp field is filled out since | ||||
|                 # techincally the call below expects a ``Status``. TODO: | ||||
|                 # parse into proper ``Status`` equivalents ems-side? | ||||
|                 # msg.setdefault('resp', msg['broker_details']['resp']) | ||||
|                 # msg.setdefault('oid', msg['broker_details']['oid']) | ||||
|                 msg['brokerd_msg'] = msg | ||||
| 
 | ||||
|                 await process_trade_msg( | ||||
|                     mode, | ||||
|                     book, | ||||
|                     msg, | ||||
|                 ) | ||||
| 
 | ||||
|             tn.start_soon( | ||||
|                 process_trades_and_update_ui, | ||||
|                 tn, | ||||
|                 feed, | ||||
|                 mode, | ||||
|                 trades_stream, | ||||
|                 mode, | ||||
|                 book, | ||||
|             ) | ||||
| 
 | ||||
|             yield mode | ||||
| 
 | ||||
| 
 | ||||
| async def process_trades_and_update_ui( | ||||
| 
 | ||||
|     n: trio.Nursery, | ||||
|     feed: Feed, | ||||
|     mode: OrderMode, | ||||
|     trades_stream: tractor.MsgStream, | ||||
|     mode: OrderMode, | ||||
|     book: OrderBook, | ||||
| 
 | ||||
| ) -> None: | ||||
| 
 | ||||
|     get_index = mode.chart.get_index | ||||
|     global _pnl_tasks | ||||
| 
 | ||||
|     # this is where we receive **back** messages | ||||
|     # about executions **from** the EMS actor | ||||
|     async for msg in trades_stream: | ||||
|         await process_trade_msg( | ||||
|             mode, | ||||
|             book, | ||||
|             msg, | ||||
|         ) | ||||
| 
 | ||||
|         fmsg = pformat(msg) | ||||
|         log.info(f'Received order msg:\n{fmsg}') | ||||
| 
 | ||||
|         name = msg['name'] | ||||
|         if name in ( | ||||
|             'position', | ||||
| async def process_trade_msg( | ||||
|     mode: OrderMode, | ||||
|     book: OrderBook, | ||||
|     msg: dict, | ||||
| 
 | ||||
| ) -> tuple[Dialog, Status]: | ||||
| 
 | ||||
|     get_index = mode.chart.get_index | ||||
|     fmsg = pformat(msg) | ||||
|     log.debug(f'Received order msg:\n{fmsg}') | ||||
|     name = msg['name'] | ||||
| 
 | ||||
|     if name in ( | ||||
|         'position', | ||||
|     ): | ||||
|         sym = mode.chart.linked.symbol | ||||
|         pp_msg_symbol = msg['symbol'].lower() | ||||
|         fqsn = sym.front_fqsn() | ||||
|         broker, key = sym.front_feed() | ||||
|         if ( | ||||
|             pp_msg_symbol == fqsn | ||||
|             or pp_msg_symbol == fqsn.removesuffix(f'.{broker}') | ||||
|         ): | ||||
|             sym = mode.chart.linked.symbol | ||||
|             pp_msg_symbol = msg['symbol'].lower() | ||||
|             fqsn = sym.front_fqsn() | ||||
|             broker, key = sym.front_feed() | ||||
|             if ( | ||||
|                 pp_msg_symbol == fqsn | ||||
|                 or pp_msg_symbol == fqsn.removesuffix(f'.{broker}') | ||||
|             ): | ||||
|                 log.info(f'{fqsn} matched pp msg: {fmsg}') | ||||
|                 tracker = mode.trackers[msg['account']] | ||||
|                 tracker.live_pp.update_from_msg(msg) | ||||
|                 # update order pane widgets | ||||
|                 tracker.update_from_pp() | ||||
|                 mode.pane.update_status_ui(tracker) | ||||
|             log.info(f'{fqsn} matched pp msg: {fmsg}') | ||||
|             tracker = mode.trackers[msg['account']] | ||||
|             tracker.live_pp.update_from_msg(msg) | ||||
|             # update order pane widgets | ||||
|             tracker.update_from_pp() | ||||
|             mode.pane.update_status_ui(tracker) | ||||
| 
 | ||||
|                 if tracker.live_pp.size: | ||||
|                     # display pnl | ||||
|                     mode.pane.display_pnl(tracker) | ||||
|             if tracker.live_pp.size: | ||||
|                 # display pnl | ||||
|                 mode.pane.display_pnl(tracker) | ||||
| 
 | ||||
|             # short circuit to next msg to avoid | ||||
|             # unnecessary msg content lookups | ||||
|             continue | ||||
|         # short circuit to next msg to avoid | ||||
|         # unnecessary msg content lookups | ||||
|         return | ||||
| 
 | ||||
|         resp = msg['resp'] | ||||
|         oid = msg['oid'] | ||||
|     msg = Status(**msg) | ||||
|     resp = msg.resp | ||||
|     oid = msg.oid | ||||
|     dialog: Dialog = mode.dialogs.get(oid) | ||||
| 
 | ||||
|         dialog = mode.dialogs.get(oid) | ||||
|         if dialog is None: | ||||
|             log.warning(f'received msg for untracked dialog:\n{fmsg}') | ||||
|     match msg: | ||||
|         case Status(resp='dark_open' | 'open'): | ||||
| 
 | ||||
|             # TODO: enable pure tracking / mirroring of dialogs | ||||
|             # is desired. | ||||
|             continue | ||||
|             if dialog is not None: | ||||
|                 # show line label once order is live | ||||
|                 mode.on_submit(oid) | ||||
| 
 | ||||
|         # record message to dialog tracking | ||||
|         dialog.msgs[oid] = msg | ||||
|             else: | ||||
|                 log.warning( | ||||
|                     f'received msg for untracked dialog:\n{fmsg}' | ||||
|                 ) | ||||
|                 assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}' | ||||
| 
 | ||||
|         # response to 'action' request (buy/sell) | ||||
|         if resp in ( | ||||
|             'dark_submitted', | ||||
|             'broker_submitted' | ||||
|         ): | ||||
|                 sym = mode.chart.linked.symbol | ||||
|                 fqsn = sym.front_fqsn() | ||||
|                 order = Order(**msg.req) | ||||
|                 if ( | ||||
|                     ((order.symbol + f'.{msg.src}') == fqsn) | ||||
| 
 | ||||
|             # show line label once order is live | ||||
|             mode.on_submit(oid) | ||||
|                     # a existing dark order for the same symbol | ||||
|                     or ( | ||||
|                         order.symbol == fqsn | ||||
|                         and (msg.src == 'dark') or (msg.src in fqsn) | ||||
|                     ) | ||||
|                 ): | ||||
|                     dialog = mode.load_unknown_dialog_from_msg(msg) | ||||
|                     mode.on_submit(oid) | ||||
|                     # return dialog, msg | ||||
| 
 | ||||
|         # resp to 'cancel' request or error condition | ||||
|         # for action request | ||||
|         elif resp in ( | ||||
|             'broker_inactive', | ||||
|             'broker_errored', | ||||
|         ): | ||||
|         case Status(resp='error'): | ||||
|             # delete level line from view | ||||
|             mode.on_cancel(oid) | ||||
|             broker_msg = msg['brokerd_msg'] | ||||
|             broker_msg = msg.brokerd_msg | ||||
|             log.error( | ||||
|                 f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' | ||||
|             ) | ||||
| 
 | ||||
|         elif resp in ( | ||||
|             'broker_cancelled', | ||||
|             'dark_cancelled' | ||||
|         ): | ||||
|         case Status(resp='canceled'): | ||||
|             # delete level line from view | ||||
|             mode.on_cancel(oid) | ||||
|             broker_msg = msg['brokerd_msg'] | ||||
|             log.cancel( | ||||
|                 f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' | ||||
|             ) | ||||
|             req = Order(**msg.req) | ||||
|             log.cancel(f'Canceled {req.action}:{oid}') | ||||
| 
 | ||||
|         elif resp in ( | ||||
|             'dark_triggered' | ||||
|         case Status( | ||||
|             resp='triggered', | ||||
|             # req=Order(exec_mode='dark')  # TODO: | ||||
|             req={'exec_mode': 'dark'}, | ||||
|         ): | ||||
|             # TODO: UX for a "pending" clear/live order | ||||
|             log.info(f'Dark order triggered for {fmsg}') | ||||
| 
 | ||||
|         elif resp in ( | ||||
|             'alert_triggered' | ||||
|         case Status( | ||||
|             resp='triggered', | ||||
|             # req=Order(exec_mode='live', action='alert') as req, # TODO | ||||
|             req={'exec_mode': 'live', 'action': 'alert'} as req, | ||||
|         ): | ||||
|             # should only be one "fill" for an alert | ||||
|             # add a triangle and remove the level line | ||||
|             req = Order(**req) | ||||
|             mode.on_fill( | ||||
|                 oid, | ||||
|                 price=msg['trigger_price'], | ||||
|                 price=req.price, | ||||
|                 arrow_index=get_index(time.time()), | ||||
|             ) | ||||
|             mode.lines.remove_line(uuid=oid) | ||||
|             msg.req = req | ||||
|             await mode.on_exec(oid, msg) | ||||
| 
 | ||||
|         # response to completed 'action' request for buy/sell | ||||
|         elif resp in ( | ||||
|             'broker_executed', | ||||
|         # response to completed 'dialog' for order request | ||||
|         case Status( | ||||
|             resp='closed', | ||||
|             # req=Order() as req,  # TODO | ||||
|             req=req, | ||||
|         ): | ||||
|             # right now this is just triggering a system alert | ||||
|             msg.req = Order(**req) | ||||
|             await mode.on_exec(oid, msg) | ||||
| 
 | ||||
|             if msg['brokerd_msg']['remaining'] == 0: | ||||
|                 mode.lines.remove_line(uuid=oid) | ||||
|             mode.lines.remove_line(uuid=oid) | ||||
| 
 | ||||
|         # each clearing tick is responded individually | ||||
|         elif resp in ( | ||||
|             'broker_filled', | ||||
|         ): | ||||
|         case Status(resp='fill'): | ||||
| 
 | ||||
|             # handle out-of-piker fills reporting? | ||||
|             known_order = book._sent_orders.get(oid) | ||||
|             if not known_order: | ||||
|                 log.warning(f'order {oid} is unknown') | ||||
|                 continue | ||||
|                 return | ||||
| 
 | ||||
|             action = known_order.action | ||||
|             details = msg['brokerd_msg'] | ||||
|             details = msg.brokerd_msg | ||||
| 
 | ||||
|             # TODO: some kinda progress system | ||||
|             mode.on_fill( | ||||
|  | @ -914,3 +995,9 @@ async def process_trades_and_update_ui( | |||
|             # TODO: how should we look this up? | ||||
|             # tracker = mode.trackers[msg['account']] | ||||
|             # tracker.live_pp.fills.append(msg) | ||||
| 
 | ||||
|     # record message to dialog tracking | ||||
|     if dialog: | ||||
|         dialog.msgs[oid] = msg | ||||
| 
 | ||||
|     return dialog, msg | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue