Compare commits
	
		
			21 Commits 
		
	
	
		
			gitea_feat
			...
			dict_diffe
		
	
	| Author | SHA1 | Date | 
|---|---|---|
|  | 2f6e3ad03f | |
|  | b75683879a | |
|  | db8a3dd1b7 | |
|  | 2d92ed2052 | |
|  | 0756cb0289 | |
|  | 66f7dd9020 | |
|  | 9782107153 | |
|  | 1f43f660fe | |
|  | d3b7d0e247 | |
|  | 700dbf0e2b | |
|  | b52c4092f3 | |
|  | 7fe3e3f482 | |
|  | bbbdcad33b | |
|  | a3812cd169 | |
|  | 5ac5743c66 | |
|  | aa204228ab | |
|  | 0bd8f2bcd9 | |
|  | 334f512ad3 | |
|  | 71cca4ceda | |
|  | 0d332427e2 | |
|  | 02980282cd | 
|  | @ -36,8 +36,6 @@ from trio_typing import TaskStatus | ||||||
| import tractor | import tractor | ||||||
| from ib_insync.contract import ( | from ib_insync.contract import ( | ||||||
|     Contract, |     Contract, | ||||||
|     # Option, |  | ||||||
|     # Forex, |  | ||||||
| ) | ) | ||||||
| from ib_insync.order import ( | from ib_insync.order import ( | ||||||
|     Trade, |     Trade, | ||||||
|  | @ -61,6 +59,8 @@ from piker.pp import ( | ||||||
| ) | ) | ||||||
| from piker.log import get_console_log | from piker.log import get_console_log | ||||||
| from piker.clearing._messages import ( | from piker.clearing._messages import ( | ||||||
|  |     Order, | ||||||
|  |     Status, | ||||||
|     BrokerdOrder, |     BrokerdOrder, | ||||||
|     BrokerdOrderAck, |     BrokerdOrderAck, | ||||||
|     BrokerdStatus, |     BrokerdStatus, | ||||||
|  | @ -123,11 +123,13 @@ async def handle_order_requests( | ||||||
|                 f'An IB account number for name {account} is not found?\n' |                 f'An IB account number for name {account} is not found?\n' | ||||||
|                 'Make sure you have all TWS and GW instances running.' |                 'Make sure you have all TWS and GW instances running.' | ||||||
|             ) |             ) | ||||||
|             await ems_order_stream.send(BrokerdError( |             await ems_order_stream.send( | ||||||
|  |                 BrokerdError( | ||||||
|                     oid=request_msg['oid'], |                     oid=request_msg['oid'], | ||||||
|                     symbol=request_msg['symbol'], |                     symbol=request_msg['symbol'], | ||||||
|                     reason=f'No account found: `{account}` ?', |                     reason=f'No account found: `{account}` ?', | ||||||
|             )) |                 ) | ||||||
|  |             ) | ||||||
|             continue |             continue | ||||||
| 
 | 
 | ||||||
|         client = _accounts2clients.get(account) |         client = _accounts2clients.get(account) | ||||||
|  | @ -147,6 +149,14 @@ async def handle_order_requests( | ||||||
|             # validate |             # validate | ||||||
|             order = BrokerdOrder(**request_msg) |             order = BrokerdOrder(**request_msg) | ||||||
| 
 | 
 | ||||||
|  |             # XXX: by default 0 tells ``ib_insync`` methods that | ||||||
|  |             # there is no existing order so ask the client to create | ||||||
|  |             # a new one (which it seems to do by allocating an int | ||||||
|  |             # counter - collision prone..) | ||||||
|  |             reqid = order.reqid | ||||||
|  |             if reqid is not None: | ||||||
|  |                 reqid = int(reqid) | ||||||
|  | 
 | ||||||
|             # call our client api to submit the order |             # call our client api to submit the order | ||||||
|             reqid = client.submit_limit( |             reqid = client.submit_limit( | ||||||
|                 oid=order.oid, |                 oid=order.oid, | ||||||
|  | @ -155,12 +165,7 @@ async def handle_order_requests( | ||||||
|                 action=order.action, |                 action=order.action, | ||||||
|                 size=order.size, |                 size=order.size, | ||||||
|                 account=acct_number, |                 account=acct_number, | ||||||
| 
 |                 reqid=reqid, | ||||||
|                 # XXX: by default 0 tells ``ib_insync`` methods that |  | ||||||
|                 # there is no existing order so ask the client to create |  | ||||||
|                 # a new one (which it seems to do by allocating an int |  | ||||||
|                 # counter - collision prone..) |  | ||||||
|                 reqid=order.reqid, |  | ||||||
|             ) |             ) | ||||||
|             if reqid is None: |             if reqid is None: | ||||||
|                 await ems_order_stream.send(BrokerdError( |                 await ems_order_stream.send(BrokerdError( | ||||||
|  | @ -180,9 +185,9 @@ async def handle_order_requests( | ||||||
|                 ) |                 ) | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         elif action == 'cancel': |         if action == 'cancel': | ||||||
|             msg = BrokerdCancel(**request_msg) |             msg = BrokerdCancel(**request_msg) | ||||||
|             client.submit_cancel(reqid=msg.reqid) |             client.submit_cancel(reqid=int(msg.reqid)) | ||||||
| 
 | 
 | ||||||
|         else: |         else: | ||||||
|             log.error(f'Unknown order command: {request_msg}') |             log.error(f'Unknown order command: {request_msg}') | ||||||
|  | @ -357,11 +362,24 @@ async def update_and_audit_msgs( | ||||||
|                 # presume we're at least not more in the shit then we |                 # presume we're at least not more in the shit then we | ||||||
|                 # thought. |                 # thought. | ||||||
|                 if diff: |                 if diff: | ||||||
|  |                     reverse_split_ratio = pikersize / ibsize | ||||||
|  |                     split_ratio = 1/reverse_split_ratio | ||||||
|  | 
 | ||||||
|  |                     if split_ratio >= reverse_split_ratio: | ||||||
|  |                         entry = f'split_ratio = {int(split_ratio)}' | ||||||
|  |                     else: | ||||||
|  |                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' | ||||||
|  | 
 | ||||||
|                     raise ValueError( |                     raise ValueError( | ||||||
|                         f'POSITION MISMATCH ib <-> piker ledger:\n' |                         f'POSITION MISMATCH ib <-> piker ledger:\n' | ||||||
|                         f'ib: {ibppmsg}\n' |                         f'ib: {ibppmsg}\n' | ||||||
|                         f'piker: {msg}\n' |                         f'piker: {msg}\n' | ||||||
|                         'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' |                         f'reverse_split_ratio: {reverse_split_ratio}\n' | ||||||
|  |                         f'split_ratio: {split_ratio}\n\n' | ||||||
|  |                         'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n' | ||||||
|  |                         'If you are expecting a (reverse) split in this ' | ||||||
|  |                         'instrument you should probably put the following ' | ||||||
|  |                         f'in the `pps.toml` section:\n{entry}' | ||||||
|                     ) |                     ) | ||||||
|                     msg.size = ibsize |                     msg.size = ibsize | ||||||
| 
 | 
 | ||||||
|  | @ -439,7 +457,6 @@ async def trades_dialogue( | ||||||
|     # we might also want to delegate a specific actor for |     # we might also want to delegate a specific actor for | ||||||
|     # ledger writing / reading for speed? |     # ledger writing / reading for speed? | ||||||
|     async with ( |     async with ( | ||||||
|         # trio.open_nursery() as nurse, |  | ||||||
|         open_client_proxies() as (proxies, aioclients), |         open_client_proxies() as (proxies, aioclients), | ||||||
|     ): |     ): | ||||||
|         # Open a trade ledgers stack for appending trade records over |         # Open a trade ledgers stack for appending trade records over | ||||||
|  | @ -468,6 +485,52 @@ async def trades_dialogue( | ||||||
| 
 | 
 | ||||||
|                 client = aioclients[account] |                 client = aioclients[account] | ||||||
| 
 | 
 | ||||||
|  |                 trades: list[Trade] = client.ib.openTrades() | ||||||
|  |                 order_msgs = [] | ||||||
|  |                 for trade in trades: | ||||||
|  | 
 | ||||||
|  |                     order = trade.order | ||||||
|  |                     quant = trade.order.totalQuantity | ||||||
|  |                     action = order.action.lower() | ||||||
|  |                     size = { | ||||||
|  |                         'sell': -1, | ||||||
|  |                         'buy': 1, | ||||||
|  |                     }[action] * quant | ||||||
|  |                     con = trade.contract | ||||||
|  | 
 | ||||||
|  |                     # TODO: in the case of the SMART venue (aka ib's | ||||||
|  |                     # router-clearing sys) we probably should handle | ||||||
|  |                     # showing such orders overtop of the fqsn for the | ||||||
|  |                     # primary exchange, how to map this easily is going | ||||||
|  |                     # to be a bit tricky though? | ||||||
|  |                     deats = await proxy.con_deats(contracts=[con]) | ||||||
|  |                     fqsn = list(deats)[0] | ||||||
|  | 
 | ||||||
|  |                     reqid = order.orderId | ||||||
|  | 
 | ||||||
|  |                     # TODO: maybe embed a ``BrokerdOrder`` instead | ||||||
|  |                     # since then we can directly load it on the client | ||||||
|  |                     # side in the order mode loop? | ||||||
|  |                     msg = Status( | ||||||
|  |                         time_ns=time.time_ns(), | ||||||
|  |                         resp='open', | ||||||
|  |                         oid=str(reqid), | ||||||
|  |                         reqid=reqid, | ||||||
|  | 
 | ||||||
|  |                         # embedded order info | ||||||
|  |                         req=Order( | ||||||
|  |                             action=action, | ||||||
|  |                             exec_mode='live', | ||||||
|  |                             oid=str(reqid), | ||||||
|  |                             symbol=fqsn, | ||||||
|  |                             account=accounts_def.inverse[order.account], | ||||||
|  |                             price=order.lmtPrice, | ||||||
|  |                             size=size, | ||||||
|  |                         ), | ||||||
|  |                         src='ib', | ||||||
|  |                     ) | ||||||
|  |                     order_msgs.append(msg) | ||||||
|  | 
 | ||||||
|                 # process pp value reported from ib's system. we only use these |                 # process pp value reported from ib's system. we only use these | ||||||
|                 # to cross-check sizing since average pricing on their end uses |                 # to cross-check sizing since average pricing on their end uses | ||||||
|                 # the so called (bs) "FIFO" style which more or less results in |                 # the so called (bs) "FIFO" style which more or less results in | ||||||
|  | @ -480,6 +543,7 @@ async def trades_dialogue( | ||||||
|                     # sure know which positions to update from the ledger if |                     # sure know which positions to update from the ledger if | ||||||
|                     # any are missing from the ``pps.toml`` |                     # any are missing from the ``pps.toml`` | ||||||
|                     bsuid, msg = pack_position(pos) |                     bsuid, msg = pack_position(pos) | ||||||
|  | 
 | ||||||
|                     acctid = msg.account = accounts_def.inverse[msg.account] |                     acctid = msg.account = accounts_def.inverse[msg.account] | ||||||
|                     acctid = acctid.strip('ib.') |                     acctid = acctid.strip('ib.') | ||||||
|                     cids2pps[(acctid, bsuid)] = msg |                     cids2pps[(acctid, bsuid)] = msg | ||||||
|  | @ -493,9 +557,7 @@ async def trades_dialogue( | ||||||
|                         or pp.size != msg.size |                         or pp.size != msg.size | ||||||
|                     ): |                     ): | ||||||
|                         trans = norm_trade_records(ledger) |                         trans = norm_trade_records(ledger) | ||||||
|                         updated = table.update_from_trans(trans) |                         table.update_from_trans(trans) | ||||||
|                         pp = updated[bsuid] |  | ||||||
| 
 |  | ||||||
|                         # update trades ledgers for all accounts from connected |                         # update trades ledgers for all accounts from connected | ||||||
|                         # api clients which report trades for **this session**. |                         # api clients which report trades for **this session**. | ||||||
|                         trades = await proxy.trades() |                         trades = await proxy.trades() | ||||||
|  | @ -521,9 +583,28 @@ async def trades_dialogue( | ||||||
|                             trans = trans_by_acct.get(acctid) |                             trans = trans_by_acct.get(acctid) | ||||||
|                             if trans: |                             if trans: | ||||||
|                                 table.update_from_trans(trans) |                                 table.update_from_trans(trans) | ||||||
|                                 updated = table.update_from_trans(trans) |  | ||||||
| 
 | 
 | ||||||
|                         assert msg.size == pp.size, 'WTF' |                         # XXX: not sure exactly why it wouldn't be in | ||||||
|  |                         # the updated output (maybe this is a bug?) but | ||||||
|  |                         # if you create a pos from TWS and then load it | ||||||
|  |                         # from the api trades it seems we get a key | ||||||
|  |                         # error from ``update[bsuid]`` ? | ||||||
|  |                         pp = table.pps.get(bsuid) | ||||||
|  |                         if not pp: | ||||||
|  |                             log.error( | ||||||
|  |                                 f'The contract id for {msg} may have ' | ||||||
|  |                                 f'changed to {bsuid}\nYou may need to ' | ||||||
|  |                                 'adjust your ledger for this, skipping ' | ||||||
|  |                                 'for now.' | ||||||
|  |                             ) | ||||||
|  |                             continue | ||||||
|  | 
 | ||||||
|  |                         if msg.size != pp.size: | ||||||
|  |                             log.error( | ||||||
|  |                                 'Position mismatch {pp.symbol.front_fqsn()}:\n' | ||||||
|  |                                 f'ib: {msg.size}\n' | ||||||
|  |                                 f'piker: {pp.size}\n' | ||||||
|  |                             ) | ||||||
| 
 | 
 | ||||||
|                 active_pps, closed_pps = table.dump_active() |                 active_pps, closed_pps = table.dump_active() | ||||||
| 
 | 
 | ||||||
|  | @ -575,6 +656,10 @@ async def trades_dialogue( | ||||||
|                 ctx.open_stream() as ems_stream, |                 ctx.open_stream() as ems_stream, | ||||||
|                 trio.open_nursery() as n, |                 trio.open_nursery() as n, | ||||||
|             ): |             ): | ||||||
|  |                 # relay existing open orders to ems | ||||||
|  |                 for msg in order_msgs: | ||||||
|  |                     await ems_stream.send(msg) | ||||||
|  | 
 | ||||||
|                 trade_event_stream = await n.start(open_trade_event_stream) |                 trade_event_stream = await n.start(open_trade_event_stream) | ||||||
|                 clients.append((client, trade_event_stream)) |                 clients.append((client, trade_event_stream)) | ||||||
| 
 | 
 | ||||||
|  | @ -586,6 +671,7 @@ async def trades_dialogue( | ||||||
|                 for client, stream in clients: |                 for client, stream in clients: | ||||||
|                     n.start_soon( |                     n.start_soon( | ||||||
|                         deliver_trade_events, |                         deliver_trade_events, | ||||||
|  |                         n, | ||||||
|                         stream, |                         stream, | ||||||
|                         ems_stream, |                         ems_stream, | ||||||
|                         accounts_def, |                         accounts_def, | ||||||
|  | @ -661,8 +747,24 @@ async def emit_pp_update( | ||||||
|     await ems_stream.send(msg) |     await ems_stream.send(msg) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | _statuses: dict[str, str] = { | ||||||
|  |     'cancelled': 'canceled', | ||||||
|  |     'submitted': 'open', | ||||||
|  |     # XXX: just pass these through? it duplicates actual fill events other | ||||||
|  |     # then the case where you the `.remaining == 0` case which is our | ||||||
|  |     # 'closed'` case. | ||||||
|  |     # 'filled': 'pending', | ||||||
|  |     # 'pendingsubmit': 'pending', | ||||||
|  | 
 | ||||||
|  |     # TODO: see a current ``ib_insync`` issue around this: | ||||||
|  |     # https://github.com/erdewit/ib_insync/issues/363 | ||||||
|  |     'inactive': 'pending', | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| async def deliver_trade_events( | async def deliver_trade_events( | ||||||
| 
 | 
 | ||||||
|  |     nurse: trio.Nursery, | ||||||
|     trade_event_stream: trio.MemoryReceiveChannel, |     trade_event_stream: trio.MemoryReceiveChannel, | ||||||
|     ems_stream: tractor.MsgStream, |     ems_stream: tractor.MsgStream, | ||||||
|     accounts_def: dict[str, str],  # eg. `'ib.main'` -> `'DU999999'` |     accounts_def: dict[str, str],  # eg. `'ib.main'` -> `'DU999999'` | ||||||
|  | @ -718,6 +820,45 @@ async def deliver_trade_events( | ||||||
|                 # unwrap needed data from ib_insync internal types |                 # unwrap needed data from ib_insync internal types | ||||||
|                 trade: Trade = item |                 trade: Trade = item | ||||||
|                 status: OrderStatus = trade.orderStatus |                 status: OrderStatus = trade.orderStatus | ||||||
|  |                 ib_status_key = status.status.lower() | ||||||
|  | 
 | ||||||
|  |                 acctid = accounts_def.inverse[trade.order.account] | ||||||
|  | 
 | ||||||
|  |                 # double check there is no error when | ||||||
|  |                 # cancelling.. gawwwd | ||||||
|  |                 if ib_status_key == 'cancelled': | ||||||
|  |                     last_log = trade.log[-1] | ||||||
|  |                     if ( | ||||||
|  |                         last_log.message | ||||||
|  |                         and 'Error' not in last_log.message | ||||||
|  |                     ): | ||||||
|  |                         ib_status_key = trade.log[-2].status | ||||||
|  |                         print(ib_status_key) | ||||||
|  | 
 | ||||||
|  |                 elif ib_status_key == 'inactive': | ||||||
|  |                     async def sched_cancel(): | ||||||
|  |                         log.warning( | ||||||
|  |                             'OH GAWD an inactive order..scheduling a cancel\n' | ||||||
|  |                             f'{pformat(item)}' | ||||||
|  |                         ) | ||||||
|  |                         proxy = proxies[acctid] | ||||||
|  |                         await proxy.submit_cancel(reqid=trade.order.orderId) | ||||||
|  |                         await trio.sleep(1) | ||||||
|  |                         nurse.start_soon(sched_cancel) | ||||||
|  | 
 | ||||||
|  |                     nurse.start_soon(sched_cancel) | ||||||
|  | 
 | ||||||
|  |                 status_key = ( | ||||||
|  |                     _statuses.get(ib_status_key) | ||||||
|  |                     or ib_status_key.lower() | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|  |                 remaining = status.remaining | ||||||
|  |                 if ( | ||||||
|  |                     status_key == 'filled' | ||||||
|  |                     and remaining == 0 | ||||||
|  |                 ): | ||||||
|  |                     status_key = 'closed' | ||||||
| 
 | 
 | ||||||
|                 # skip duplicate filled updates - we get the deats |                 # skip duplicate filled updates - we get the deats | ||||||
|                 # from the execution details event |                 # from the execution details event | ||||||
|  | @ -728,14 +869,14 @@ async def deliver_trade_events( | ||||||
|                     account=accounts_def.inverse[trade.order.account], |                     account=accounts_def.inverse[trade.order.account], | ||||||
| 
 | 
 | ||||||
|                     # everyone doin camel case.. |                     # everyone doin camel case.. | ||||||
|                     status=status.status.lower(),  # force lower case |                     status=status_key,  # force lower case | ||||||
| 
 | 
 | ||||||
|                     filled=status.filled, |                     filled=status.filled, | ||||||
|                     reason=status.whyHeld, |                     reason=status.whyHeld, | ||||||
| 
 | 
 | ||||||
|                     # this seems to not be necessarily up to date in the |                     # this seems to not be necessarily up to date in the | ||||||
|                     # execDetails event.. so we have to send it here I guess? |                     # execDetails event.. so we have to send it here I guess? | ||||||
|                     remaining=status.remaining, |                     remaining=remaining, | ||||||
| 
 | 
 | ||||||
|                     broker_details={'name': 'ib'}, |                     broker_details={'name': 'ib'}, | ||||||
|                 ) |                 ) | ||||||
|  | @ -870,17 +1011,25 @@ async def deliver_trade_events( | ||||||
|                 if err['reqid'] == -1: |                 if err['reqid'] == -1: | ||||||
|                     log.error(f'TWS external order error:\n{pformat(err)}') |                     log.error(f'TWS external order error:\n{pformat(err)}') | ||||||
| 
 | 
 | ||||||
|                 # TODO: what schema for this msg if we're going to make it |                 # TODO: we don't want to relay data feed / lookup errors | ||||||
|                 # portable across all backends? |                 # so we need some further filtering logic here.. | ||||||
|                 # msg = BrokerdError(**err) |                 # for most cases the 'status' block above should take | ||||||
|  |                 # care of this. | ||||||
|  |                 # await ems_stream.send(BrokerdStatus( | ||||||
|  |                 #     status='error', | ||||||
|  |                 #     reqid=err['reqid'], | ||||||
|  |                 #     reason=err['reason'], | ||||||
|  |                 #     time_ns=time.time_ns(), | ||||||
|  |                 #     account=accounts_def.inverse[trade.order.account], | ||||||
|  |                 #     broker_details={'name': 'ib'}, | ||||||
|  |                 # )) | ||||||
| 
 | 
 | ||||||
|             case 'position': |             case 'position': | ||||||
| 
 | 
 | ||||||
|                 cid, msg = pack_position(item) |                 cid, msg = pack_position(item) | ||||||
|                 log.info(f'New IB position msg: {msg}') |                 log.info(f'New IB position msg: {msg}') | ||||||
|                 # acctid = msg.account = accounts_def.inverse[msg.account] |  | ||||||
|                 # cuck ib and it's shitty fifo sys for pps! |                 # cuck ib and it's shitty fifo sys for pps! | ||||||
|                 # await ems_stream.send(msg) |                 continue | ||||||
| 
 | 
 | ||||||
|             case 'event': |             case 'event': | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -101,3 +101,30 @@ def percent_change( | ||||||
|     new: float, |     new: float, | ||||||
| ) -> float: | ) -> float: | ||||||
|     return pnl(init, new) * 100. |     return pnl(init, new) * 100. | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def diff_dict( | ||||||
|  |     d1: dict, | ||||||
|  |     d2: dict, | ||||||
|  | 
 | ||||||
|  | ) -> dict: | ||||||
|  |     d1_keys = set(d1.keys()) | ||||||
|  |     d2_keys = set(d2.keys()) | ||||||
|  |     shared_keys = d1_keys.intersection(d2_keys) | ||||||
|  |     shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]} | ||||||
|  |     added_keys = d2_keys - d1_keys | ||||||
|  |     added_deltas = {o: (None, d2[o]) for o in added_keys} | ||||||
|  |     deltas = {**shared_deltas, **added_deltas} | ||||||
|  |     return parse_deltas(deltas) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def parse_deltas(deltas: dict) -> dict: | ||||||
|  |     res = {} | ||||||
|  |     for k, v in deltas.items(): | ||||||
|  |         if isinstance(v[0], dict): | ||||||
|  |             tmp = diff_dict(v[0], v[1]) | ||||||
|  |             if tmp: | ||||||
|  |                 res[k] = tmp | ||||||
|  |         else: | ||||||
|  |             res[k] = v[1] | ||||||
|  |     return res | ||||||
|  |  | ||||||
|  | @ -83,7 +83,13 @@ class OrderBook: | ||||||
|         """Cancel an order (or alert) in the EMS. |         """Cancel an order (or alert) in the EMS. | ||||||
| 
 | 
 | ||||||
|         """ |         """ | ||||||
|         cmd = self._sent_orders[uuid] |         cmd = self._sent_orders.get(uuid) | ||||||
|  |         if not cmd: | ||||||
|  |             log.error( | ||||||
|  |                 f'Unknown order {uuid}!?\n' | ||||||
|  |                 f'Maybe there is a stale entry or line?\n' | ||||||
|  |                 f'You should report this as a bug!' | ||||||
|  |             ) | ||||||
|         msg = Cancel( |         msg = Cancel( | ||||||
|             oid=uuid, |             oid=uuid, | ||||||
|             symbol=cmd.symbol, |             symbol=cmd.symbol, | ||||||
|  | @ -149,10 +155,17 @@ async def relay_order_cmds_from_sync_code( | ||||||
|     book = get_orders() |     book = get_orders() | ||||||
|     async with book._from_order_book.subscribe() as orders_stream: |     async with book._from_order_book.subscribe() as orders_stream: | ||||||
|         async for cmd in orders_stream: |         async for cmd in orders_stream: | ||||||
|             if cmd.symbol == symbol_key: |             sym = cmd.symbol | ||||||
|                 log.info(f'Send order cmd:\n{pformat(cmd)}') |             msg = pformat(cmd) | ||||||
|  |             if sym == symbol_key: | ||||||
|  |                 log.info(f'Send order cmd:\n{msg}') | ||||||
|                 # send msg over IPC / wire |                 # send msg over IPC / wire | ||||||
|                 await to_ems_stream.send(cmd) |                 await to_ems_stream.send(cmd) | ||||||
|  |             else: | ||||||
|  |                 log.warning( | ||||||
|  |                     f'Ignoring unmatched order cmd for {sym} != {symbol_key}:' | ||||||
|  |                     f'\n{msg}' | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @acm | @acm | ||||||
|  | @ -220,11 +233,19 @@ async def open_ems( | ||||||
|                 fqsn=fqsn, |                 fqsn=fqsn, | ||||||
|                 exec_mode=mode, |                 exec_mode=mode, | ||||||
| 
 | 
 | ||||||
|             ) as (ctx, (positions, accounts)), |             ) as ( | ||||||
|  |                 ctx, | ||||||
|  |                 ( | ||||||
|  |                     positions, | ||||||
|  |                     accounts, | ||||||
|  |                     dialogs, | ||||||
|  |                 ) | ||||||
|  |             ), | ||||||
| 
 | 
 | ||||||
|             # open 2-way trade command stream |             # open 2-way trade command stream | ||||||
|             ctx.open_stream() as trades_stream, |             ctx.open_stream() as trades_stream, | ||||||
|         ): |         ): | ||||||
|  |             # start sync code order msg delivery task | ||||||
|             async with trio.open_nursery() as n: |             async with trio.open_nursery() as n: | ||||||
|                 n.start_soon( |                 n.start_soon( | ||||||
|                     relay_order_cmds_from_sync_code, |                     relay_order_cmds_from_sync_code, | ||||||
|  | @ -232,4 +253,10 @@ async def open_ems( | ||||||
|                     trades_stream |                     trades_stream | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|                 yield book, trades_stream, positions, accounts |                 yield ( | ||||||
|  |                     book, | ||||||
|  |                     trades_stream, | ||||||
|  |                     positions, | ||||||
|  |                     accounts, | ||||||
|  |                     dialogs, | ||||||
|  |                 ) | ||||||
|  |  | ||||||
|  | @ -18,8 +18,8 @@ | ||||||
| In da suit parlances: "Execution management systems" | In da suit parlances: "Execution management systems" | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
|  | from collections import defaultdict, ChainMap | ||||||
| from contextlib import asynccontextmanager | from contextlib import asynccontextmanager | ||||||
| from dataclasses import dataclass, field |  | ||||||
| from math import isnan | from math import isnan | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
| import time | import time | ||||||
|  | @ -27,6 +27,7 @@ from typing import ( | ||||||
|     AsyncIterator, |     AsyncIterator, | ||||||
|     Any, |     Any, | ||||||
|     Callable, |     Callable, | ||||||
|  |     Optional, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| from bidict import bidict | from bidict import bidict | ||||||
|  | @ -41,9 +42,16 @@ from ..data.types import Struct | ||||||
| from .._daemon import maybe_spawn_brokerd | from .._daemon import maybe_spawn_brokerd | ||||||
| from . import _paper_engine as paper | from . import _paper_engine as paper | ||||||
| from ._messages import ( | from ._messages import ( | ||||||
|     Status, Order, |     Order, | ||||||
|     BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, |     Status, | ||||||
|     BrokerdFill, BrokerdError, BrokerdPosition, |     # Cancel, | ||||||
|  |     BrokerdCancel, | ||||||
|  |     BrokerdOrder, | ||||||
|  |     # BrokerdOrderAck, | ||||||
|  |     BrokerdStatus, | ||||||
|  |     BrokerdFill, | ||||||
|  |     BrokerdError, | ||||||
|  |     BrokerdPosition, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -90,8 +98,7 @@ def mk_check( | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @dataclass | class _DarkBook(Struct): | ||||||
| class _DarkBook: |  | ||||||
|     ''' |     ''' | ||||||
|     EMS-trigger execution book. |     EMS-trigger execution book. | ||||||
| 
 | 
 | ||||||
|  | @ -116,17 +123,24 @@ class _DarkBook: | ||||||
|                 dict,  # cmd / msg type |                 dict,  # cmd / msg type | ||||||
|             ] |             ] | ||||||
|         ] |         ] | ||||||
|     ] = field(default_factory=dict) |     ] = {} | ||||||
| 
 | 
 | ||||||
|     # tracks most recent values per symbol each from data feed |     # tracks most recent values per symbol each from data feed | ||||||
|     lasts: dict[ |     lasts: dict[ | ||||||
|         str, |         str, | ||||||
|         float, |         float, | ||||||
|     ] = field(default_factory=dict) |     ] = {} | ||||||
| 
 | 
 | ||||||
|     # mapping of piker ems order ids to current brokerd order flow message |     # _ems_entries: dict[str, str] = {} | ||||||
|     _ems_entries: dict[str, str] = field(default_factory=dict) |     _active: dict = {} | ||||||
|     _ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) | 
 | ||||||
|  |     # mapping of ems dialog ids to msg flow history | ||||||
|  |     _msgflows: defaultdict[ | ||||||
|  |         int, | ||||||
|  |         ChainMap[dict[str, dict]], | ||||||
|  |     ] = defaultdict(ChainMap) | ||||||
|  | 
 | ||||||
|  |     _ems2brokerd_ids: dict[str, str] = bidict() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # XXX: this is in place to prevent accidental positions that are too | # XXX: this is in place to prevent accidental positions that are too | ||||||
|  | @ -181,6 +195,7 @@ async def clear_dark_triggers( | ||||||
|                 for oid, ( |                 for oid, ( | ||||||
|                     pred, |                     pred, | ||||||
|                     tf, |                     tf, | ||||||
|  |                     # TODO: send this msg instead? | ||||||
|                     cmd, |                     cmd, | ||||||
|                     percent_away, |                     percent_away, | ||||||
|                     abs_diff_away |                     abs_diff_away | ||||||
|  | @ -188,9 +203,9 @@ async def clear_dark_triggers( | ||||||
|                     tuple(execs.items()) |                     tuple(execs.items()) | ||||||
|                 ): |                 ): | ||||||
|                     if ( |                     if ( | ||||||
|                         not pred or |                         not pred | ||||||
|                         ttype not in tf or |                         or ttype not in tf | ||||||
|                         not pred(price) |                         or not pred(price) | ||||||
|                     ): |                     ): | ||||||
|                         # log.runtime( |                         # log.runtime( | ||||||
|                         #     f'skipping quote for {sym} ' |                         #     f'skipping quote for {sym} ' | ||||||
|  | @ -200,30 +215,29 @@ async def clear_dark_triggers( | ||||||
|                         # majority of iterations will be non-matches |                         # majority of iterations will be non-matches | ||||||
|                         continue |                         continue | ||||||
| 
 | 
 | ||||||
|  |                     brokerd_msg: Optional[BrokerdOrder] = None | ||||||
|                     match cmd: |                     match cmd: | ||||||
|                         # alert: nothing to do but relay a status |                         # alert: nothing to do but relay a status | ||||||
|                         # back to the requesting ems client |                         # back to the requesting ems client | ||||||
|                         case { |                         case Order(action='alert'): | ||||||
|                             'action': 'alert', |                             resp = 'triggered' | ||||||
|                         }: |  | ||||||
|                             resp = 'alert_triggered' |  | ||||||
| 
 | 
 | ||||||
|                         # executable order submission |                         # executable order submission | ||||||
|                         case { |                         case Order( | ||||||
|                             'action': action, |                             action=action, | ||||||
|                             'symbol': symbol, |                             symbol=symbol, | ||||||
|                             'account': account, |                             account=account, | ||||||
|                             'size': size, |                             size=size, | ||||||
|                         }: |                         ): | ||||||
|                             bfqsn: str = symbol.replace(f'.{broker}', '') |                             bfqsn: str = symbol.replace(f'.{broker}', '') | ||||||
|                             submit_price = price + abs_diff_away |                             submit_price = price + abs_diff_away | ||||||
|                             resp = 'dark_triggered'  # hidden on client-side |                             resp = 'triggered'  # hidden on client-side | ||||||
| 
 | 
 | ||||||
|                             log.info( |                             log.info( | ||||||
|                                 f'Dark order triggered for price {price}\n' |                                 f'Dark order triggered for price {price}\n' | ||||||
|                                 f'Submitting order @ price {submit_price}') |                                 f'Submitting order @ price {submit_price}') | ||||||
| 
 | 
 | ||||||
|                             live_req = BrokerdOrder( |                             brokerd_msg = BrokerdOrder( | ||||||
|                                 action=action, |                                 action=action, | ||||||
|                                 oid=oid, |                                 oid=oid, | ||||||
|                                 account=account, |                                 account=account, | ||||||
|  | @ -232,7 +246,8 @@ async def clear_dark_triggers( | ||||||
|                                 price=submit_price, |                                 price=submit_price, | ||||||
|                                 size=size, |                                 size=size, | ||||||
|                             ) |                             ) | ||||||
|                             await brokerd_orders_stream.send(live_req) | 
 | ||||||
|  |                             await brokerd_orders_stream.send(brokerd_msg) | ||||||
| 
 | 
 | ||||||
|                             # mark this entry as having sent an order |                             # mark this entry as having sent an order | ||||||
|                             # request.  the entry will be replaced once the |                             # request.  the entry will be replaced once the | ||||||
|  | @ -240,18 +255,19 @@ async def clear_dark_triggers( | ||||||
|                             # a ``BrokerdOrderAck`` msg including the |                             # a ``BrokerdOrderAck`` msg including the | ||||||
|                             # allocated unique ``BrokerdOrderAck.reqid`` key |                             # allocated unique ``BrokerdOrderAck.reqid`` key | ||||||
|                             # generated by the broker's own systems. |                             # generated by the broker's own systems. | ||||||
|                             book._ems_entries[oid] = live_req |                             # book._ems_entries[oid] = live_req | ||||||
|  |                             # book._msgflows[oid].maps.insert(0, live_req) | ||||||
| 
 | 
 | ||||||
|                         case _: |                         case _: | ||||||
|                             raise ValueError(f'Invalid dark book entry: {cmd}') |                             raise ValueError(f'Invalid dark book entry: {cmd}') | ||||||
| 
 | 
 | ||||||
|                     # fallthrough logic |                     # fallthrough logic | ||||||
|                     resp = Status( |                     status = Status( | ||||||
|                         oid=oid,  # ems dialog id |                         oid=oid,  # ems dialog id | ||||||
|                         time_ns=time.time_ns(), |                         time_ns=time.time_ns(), | ||||||
|                         resp=resp, |                         resp=resp, | ||||||
|                         trigger_price=price, |                         req=cmd, | ||||||
|                         brokerd_msg=cmd, |                         brokerd_msg=brokerd_msg, | ||||||
|                     ) |                     ) | ||||||
| 
 | 
 | ||||||
|                     # remove exec-condition from set |                     # remove exec-condition from set | ||||||
|  | @ -262,9 +278,18 @@ async def clear_dark_triggers( | ||||||
|                             f'pred for {oid} was already removed!?' |                             f'pred for {oid} was already removed!?' | ||||||
|                         ) |                         ) | ||||||
| 
 | 
 | ||||||
|  |                     # update actives | ||||||
|  |                     if cmd.action == 'alert': | ||||||
|  |                         # don't register the alert status (so it won't | ||||||
|  |                         # be reloaded by clients) since it's now | ||||||
|  |                         # complete / closed. | ||||||
|  |                         book._active.pop(oid) | ||||||
|  |                     else: | ||||||
|  |                         book._active[oid] = status | ||||||
|  | 
 | ||||||
|                     # send response to client-side |                     # send response to client-side | ||||||
|                     try: |                     try: | ||||||
|                         await ems_client_order_stream.send(resp) |                         await ems_client_order_stream.send(status) | ||||||
|                     except ( |                     except ( | ||||||
|                         trio.ClosedResourceError, |                         trio.ClosedResourceError, | ||||||
|                     ): |                     ): | ||||||
|  | @ -281,8 +306,7 @@ async def clear_dark_triggers( | ||||||
|         # print(f'execs scan took: {time.time() - start}') |         # print(f'execs scan took: {time.time() - start}') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @dataclass | class TradesRelay(Struct): | ||||||
| class TradesRelay: |  | ||||||
| 
 | 
 | ||||||
|     # for now we keep only a single connection open with |     # for now we keep only a single connection open with | ||||||
|     # each ``brokerd`` for simplicity. |     # each ``brokerd`` for simplicity. | ||||||
|  | @ -318,7 +342,10 @@ class Router(Struct): | ||||||
| 
 | 
 | ||||||
|     # order id to client stream map |     # order id to client stream map | ||||||
|     clients: set[tractor.MsgStream] = set() |     clients: set[tractor.MsgStream] = set() | ||||||
|     dialogues: dict[str, list[tractor.MsgStream]] = {} |     dialogues: dict[ | ||||||
|  |         str, | ||||||
|  |         list[tractor.MsgStream] | ||||||
|  |     ] = {} | ||||||
| 
 | 
 | ||||||
|     # brokername to trades-dialogues streams with ``brokerd`` actors |     # brokername to trades-dialogues streams with ``brokerd`` actors | ||||||
|     relays: dict[str, TradesRelay] = {} |     relays: dict[str, TradesRelay] = {} | ||||||
|  | @ -341,11 +368,12 @@ class Router(Struct): | ||||||
|         loglevel: str, |         loglevel: str, | ||||||
| 
 | 
 | ||||||
|     ) -> tuple[dict, tractor.MsgStream]: |     ) -> tuple[dict, tractor.MsgStream]: | ||||||
|         '''Open and yield ``brokerd`` trades dialogue context-stream if none |         ''' | ||||||
|         already exists. |         Open and yield ``brokerd`` trades dialogue context-stream if | ||||||
|  |         none already exists. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|         relay = self.relays.get(feed.mod.name) |         relay: TradesRelay = self.relays.get(feed.mod.name) | ||||||
| 
 | 
 | ||||||
|         if ( |         if ( | ||||||
|             relay is None |             relay is None | ||||||
|  | @ -381,6 +409,22 @@ class Router(Struct): | ||||||
| 
 | 
 | ||||||
|             relay.consumers -= 1 |             relay.consumers -= 1 | ||||||
| 
 | 
 | ||||||
|  |     async def client_broadcast( | ||||||
|  |         self, | ||||||
|  |         msg: dict, | ||||||
|  | 
 | ||||||
|  |     ) -> None: | ||||||
|  |         for client_stream in self.clients.copy(): | ||||||
|  |             try: | ||||||
|  |                 await client_stream.send(msg) | ||||||
|  |             except( | ||||||
|  |                 trio.ClosedResourceError, | ||||||
|  |                 trio.BrokenResourceError, | ||||||
|  |             ): | ||||||
|  |                 self.clients.remove(client_stream) | ||||||
|  |                 log.warning( | ||||||
|  |                     f'client for {client_stream} was already closed?') | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| _router: Router = None | _router: Router = None | ||||||
| 
 | 
 | ||||||
|  | @ -452,7 +496,6 @@ async def open_brokerd_trades_dialogue( | ||||||
|             async with ( |             async with ( | ||||||
|                 open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), |                 open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), | ||||||
|                 brokerd_ctx.open_stream() as brokerd_trades_stream, |                 brokerd_ctx.open_stream() as brokerd_trades_stream, | ||||||
| 
 |  | ||||||
|             ): |             ): | ||||||
|                 # XXX: really we only want one stream per `emsd` actor |                 # XXX: really we only want one stream per `emsd` actor | ||||||
|                 # to relay global `brokerd` order events unless we're |                 # to relay global `brokerd` order events unless we're | ||||||
|  | @ -502,14 +545,9 @@ async def open_brokerd_trades_dialogue( | ||||||
| 
 | 
 | ||||||
|                 task_status.started(relay) |                 task_status.started(relay) | ||||||
| 
 | 
 | ||||||
|                 await translate_and_relay_brokerd_events( |  | ||||||
|                     broker, |  | ||||||
|                     brokerd_trades_stream, |  | ||||||
|                     _router, |  | ||||||
|                 ) |  | ||||||
| 
 |  | ||||||
|                 # this context should block here indefinitely until |                 # this context should block here indefinitely until | ||||||
|                 # the ``brokerd`` task either dies or is cancelled |                 # the ``brokerd`` task either dies or is cancelled | ||||||
|  |                 await trio.sleep_forever() | ||||||
| 
 | 
 | ||||||
|         finally: |         finally: | ||||||
|             # parent context must have been closed |             # parent context must have been closed | ||||||
|  | @ -561,15 +599,14 @@ async def translate_and_relay_brokerd_events( | ||||||
| 
 | 
 | ||||||
|         broker       ems |         broker       ems | ||||||
|         'error'  ->  log it locally (for now) |         'error'  ->  log it locally (for now) | ||||||
|         'status' ->  relabel as 'broker_<status>', if complete send 'executed' |         ('status' | 'fill'} ->  relayed through see ``Status`` msg type. | ||||||
|         'fill'   ->  'broker_filled' |  | ||||||
| 
 | 
 | ||||||
|     Currently handled status values from IB: |     Currently handled status values from IB: | ||||||
|         {'presubmitted', 'submitted', 'cancelled', 'inactive'} |         {'presubmitted', 'submitted', 'cancelled', 'inactive'} | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     book = router.get_dark_book(broker) |     book: _DarkBook = router.get_dark_book(broker) | ||||||
|     relay = router.relays[broker] |     relay: TradesRelay = router.relays[broker] | ||||||
| 
 | 
 | ||||||
|     assert relay.brokerd_dialogue == brokerd_trades_stream |     assert relay.brokerd_dialogue == brokerd_trades_stream | ||||||
| 
 | 
 | ||||||
|  | @ -601,30 +638,16 @@ async def translate_and_relay_brokerd_events( | ||||||
| 
 | 
 | ||||||
|                 # fan-out-relay position msgs immediately by |                 # fan-out-relay position msgs immediately by | ||||||
|                 # broadcasting updates on all client streams |                 # broadcasting updates on all client streams | ||||||
|                 for client_stream in router.clients.copy(): |                 await router.client_broadcast(pos_msg) | ||||||
|                     try: |  | ||||||
|                         await client_stream.send(pos_msg) |  | ||||||
|                     except( |  | ||||||
|                         trio.ClosedResourceError, |  | ||||||
|                         trio.BrokenResourceError, |  | ||||||
|                     ): |  | ||||||
|                         router.clients.remove(client_stream) |  | ||||||
|                         log.warning( |  | ||||||
|                             f'client for {client_stream} was already closed?') |  | ||||||
| 
 |  | ||||||
|                 continue |                 continue | ||||||
| 
 | 
 | ||||||
|             # BrokerdOrderAck |             # BrokerdOrderAck | ||||||
|  |             # initial response to brokerd order request | ||||||
|             case { |             case { | ||||||
|                 'name': 'ack', |                 'name': 'ack', | ||||||
|                 'reqid': reqid,  # brokerd generated order-request id |                 'reqid': reqid,  # brokerd generated order-request id | ||||||
|                 'oid': oid,  # ems order-dialog id |                 'oid': oid,  # ems order-dialog id | ||||||
|             } if ( |             }: | ||||||
|                 entry := book._ems_entries.get(oid) |  | ||||||
|             ): |  | ||||||
|                 # initial response to brokerd order request |  | ||||||
|                 # if name == 'ack': |  | ||||||
| 
 |  | ||||||
|                 # register the brokerd request id (that was generated |                 # register the brokerd request id (that was generated | ||||||
|                 # / created internally by the broker backend) with our |                 # / created internally by the broker backend) with our | ||||||
|                 # local ems order id for reverse lookup later. |                 # local ems order id for reverse lookup later. | ||||||
|  | @ -639,23 +662,24 @@ async def translate_and_relay_brokerd_events( | ||||||
|                 # new order which has not yet be registered into the |                 # new order which has not yet be registered into the | ||||||
|                 # local ems book, insert it now and handle 2 cases: |                 # local ems book, insert it now and handle 2 cases: | ||||||
| 
 | 
 | ||||||
|                 # - the order has previously been requested to be |                 # 1. the order has previously been requested to be | ||||||
|                 # cancelled by the ems controlling client before we |                 # cancelled by the ems controlling client before we | ||||||
|                 # received this ack, in which case we relay that cancel |                 # received this ack, in which case we relay that cancel | ||||||
|                 # signal **asap** to the backend broker |                 # signal **asap** to the backend broker | ||||||
|                 action = getattr(entry, 'action', None) |                 # status = book._active.get(oid) | ||||||
|                 if action and action == 'cancel': |                 status_msg = book._active[oid] | ||||||
|  |                 req = status_msg.req | ||||||
|  |                 if req and req.action == 'cancel': | ||||||
|                     # assign newly providerd broker backend request id |                     # assign newly providerd broker backend request id | ||||||
|                     entry.reqid = reqid |                     # and tell broker to cancel immediately | ||||||
|  |                     status_msg.reqid = reqid | ||||||
|  |                     await brokerd_trades_stream.send(req) | ||||||
| 
 | 
 | ||||||
|                     # tell broker to cancel immediately |                 # 2. the order is now active and will be mirrored in | ||||||
|                     await brokerd_trades_stream.send(entry) |  | ||||||
| 
 |  | ||||||
|                 # - the order is now active and will be mirrored in |  | ||||||
|                 # our book -> registered as live flow |                 # our book -> registered as live flow | ||||||
|                 else: |                 else: | ||||||
|                     # update the flow with the ack msg |                     # TODO: should we relay this ack state? | ||||||
|                     book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) |                     status_msg.resp = 'pending' | ||||||
| 
 | 
 | ||||||
|                 # no msg to client necessary |                 # no msg to client necessary | ||||||
|                 continue |                 continue | ||||||
|  | @ -666,11 +690,9 @@ async def translate_and_relay_brokerd_events( | ||||||
|                 'oid': oid,  # ems order-dialog id |                 'oid': oid,  # ems order-dialog id | ||||||
|                 'reqid': reqid,  # brokerd generated order-request id |                 'reqid': reqid,  # brokerd generated order-request id | ||||||
|                 'symbol': sym, |                 'symbol': sym, | ||||||
|                 'broker_details': details, |             } if status_msg := book._active.get(oid): | ||||||
|                 # 'reason': reason, | 
 | ||||||
|             }: |  | ||||||
|                 msg = BrokerdError(**brokerd_msg) |                 msg = BrokerdError(**brokerd_msg) | ||||||
|                 resp = 'broker_errored' |  | ||||||
|                 log.error(pformat(msg))  # XXX make one when it's blank? |                 log.error(pformat(msg))  # XXX make one when it's blank? | ||||||
| 
 | 
 | ||||||
|                 # TODO: figure out how this will interact with EMS clients |                 # TODO: figure out how this will interact with EMS clients | ||||||
|  | @ -680,43 +702,64 @@ async def translate_and_relay_brokerd_events( | ||||||
|                 # some unexpected failure - something we need to think more |                 # some unexpected failure - something we need to think more | ||||||
|                 # about.  In most default situations, with composed orders |                 # about.  In most default situations, with composed orders | ||||||
|                 # (ex.  brackets), most brokers seem to use a oca policy. |                 # (ex.  brackets), most brokers seem to use a oca policy. | ||||||
|  |                 ems_client_order_stream = router.dialogues[oid] | ||||||
|  |                 status_msg.resp = 'error' | ||||||
|  |                 status_msg.brokerd_msg = msg | ||||||
|  |                 book._active[oid] = status_msg | ||||||
|  |                 await ems_client_order_stream.send(status_msg) | ||||||
| 
 | 
 | ||||||
|             # BrokerdStatus |             # BrokerdStatus | ||||||
|             case { |             case { | ||||||
|                 'name': 'status', |                 'name': 'status', | ||||||
|                 'status': status, |                 'status': status, | ||||||
|                 'reqid': reqid,  # brokerd generated order-request id |                 'reqid': reqid,  # brokerd generated order-request id | ||||||
|                 # TODO: feels like the wrong msg for this field? |  | ||||||
|                 'remaining': remaining, |  | ||||||
| 
 | 
 | ||||||
|             } if ( |             } if ( | ||||||
|                 oid := book._ems2brokerd_ids.inverse.get(reqid) |                 (oid := book._ems2brokerd_ids.inverse.get(reqid)) | ||||||
|  |                 and status in ( | ||||||
|  |                     'canceled', | ||||||
|  |                     'open', | ||||||
|  |                     'closed', | ||||||
|  |                 ) | ||||||
|             ): |             ): | ||||||
|                 msg = BrokerdStatus(**brokerd_msg) |                 msg = BrokerdStatus(**brokerd_msg) | ||||||
| 
 | 
 | ||||||
|                 # TODO: should we flatten out these cases and/or should |                 # TODO: maybe pack this into a composite type that | ||||||
|                 # they maybe even eventually be separate messages? |                 # contains both the IPC stream as well the | ||||||
|                 if status == 'cancelled': |                 # msg-chain/dialog. | ||||||
|                     log.info(f'Cancellation for {oid} is complete!') |                 ems_client_order_stream = router.dialogues[oid] | ||||||
|  |                 status_msg = book._active[oid] | ||||||
|  |                 old_resp = status_msg.resp | ||||||
|  |                 status_msg.resp = status | ||||||
| 
 | 
 | ||||||
|                 if status == 'filled': |                 # retrieve existing live flow | ||||||
|                     # conditional execution is fully complete, no more |                 old_reqid = status_msg.reqid | ||||||
|                     # fills for the noted order |                 if old_reqid and old_reqid != reqid: | ||||||
|                     if not remaining: |                     log.warning( | ||||||
|  |                         f'Brokerd order id change for {oid}:\n' | ||||||
|  |                         f'{old_reqid} -> {reqid}' | ||||||
|  |                     ) | ||||||
| 
 | 
 | ||||||
|                         resp = 'broker_executed' |                 status_msg.reqid = reqid  # THIS LINE IS CRITICAL! | ||||||
|  |                 status_msg.brokerd_msg = msg | ||||||
|  |                 status_msg.src = msg.broker_details['name'] | ||||||
|  |                 await ems_client_order_stream.send(status_msg) | ||||||
| 
 | 
 | ||||||
|                         # be sure to pop this stream from our dialogue set |                 if status == 'closed': | ||||||
|                         # since the order dialogue should be done. |  | ||||||
|                     log.info(f'Execution for {oid} is complete!') |                     log.info(f'Execution for {oid} is complete!') | ||||||
| 
 | 
 | ||||||
|                     # just log it |                     # only if we already rxed a fill then probably | ||||||
|                     else: |                     # this clear is fully complete? (frickin ib..) | ||||||
|                         log.info(f'{broker} filled {msg}') |                     if old_resp == 'fill': | ||||||
|  |                         status_msg = book._active.pop(oid) | ||||||
| 
 | 
 | ||||||
|                 else: |                 elif status == 'canceled': | ||||||
|                     # one of {submitted, cancelled} |                     log.cancel(f'Cancellation for {oid} is complete!') | ||||||
|                     resp = 'broker_' + msg.status | 
 | ||||||
|  |                 else:  # open | ||||||
|  |                     # relayed from backend but probably not handled so | ||||||
|  |                     # just log it | ||||||
|  |                     log.info(f'{broker} opened order {msg}') | ||||||
| 
 | 
 | ||||||
|             # BrokerdFill |             # BrokerdFill | ||||||
|             case { |             case { | ||||||
|  | @ -728,82 +771,111 @@ async def translate_and_relay_brokerd_events( | ||||||
|             ): |             ): | ||||||
|                 # proxy through the "fill" result(s) |                 # proxy through the "fill" result(s) | ||||||
|                 msg = BrokerdFill(**brokerd_msg) |                 msg = BrokerdFill(**brokerd_msg) | ||||||
|                 resp = 'broker_filled' |                 log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') | ||||||
|                 log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') |  | ||||||
| 
 | 
 | ||||||
|             # unknown valid message case? |                 ems_client_order_stream = router.dialogues[oid] | ||||||
|             # case { |  | ||||||
|             #     'name': name, |  | ||||||
|             #     'symbol': sym, |  | ||||||
|             #     'reqid': reqid,  # brokerd generated order-request id |  | ||||||
|             #     # 'oid': oid,  # ems order-dialog id |  | ||||||
|             #     'broker_details': details, |  | ||||||
| 
 | 
 | ||||||
|             # } if ( |                 # wtf a fill can come after 'closed' from ib? | ||||||
|             #     book._ems2brokerd_ids.inverse.get(reqid) is None |                 status_msg = book._active[oid] | ||||||
|             # ): |  | ||||||
|             #     # TODO: pretty sure we can drop this now? |  | ||||||
| 
 | 
 | ||||||
|             #     # XXX: paper clearing special cases |                 # only if we already rxed a 'closed' | ||||||
|             #     # paper engine race case: ``Client.submit_limit()`` hasn't |                 # this clear is fully complete? (frickin ib..) | ||||||
|             #     # returned yet and provided an output reqid to register |                 # if status_msg.resp == 'closed': | ||||||
|             #     # locally, so we need to retreive the oid that was already |                 #     status_msg = book._active.pop(oid) | ||||||
|             #     # packed at submission since we already know it ahead of |  | ||||||
|             #     # time |  | ||||||
|             #     paper = details.get('paper_info') |  | ||||||
|             #     ext = details.get('external') |  | ||||||
| 
 | 
 | ||||||
|             #     if paper: |                 status_msg.resp = 'fill' | ||||||
|             #         # paperboi keeps the ems id up front |                 status_msg.reqid = reqid | ||||||
|             #         oid = paper['oid'] |                 status_msg.brokerd_msg = msg | ||||||
|  |                 await ems_client_order_stream.send(status_msg) | ||||||
| 
 | 
 | ||||||
|             #     elif ext: |             # ``Status`` containing an embedded order msg which | ||||||
|             #         # may be an order msg specified as "external" to the |             # should be loaded as a "pre-existing open order" from the | ||||||
|             #         # piker ems flow (i.e. generated by some other |             # brokerd backend. | ||||||
|             #         # external broker backend client (like tws for ib) |             case { | ||||||
|             #         log.error(f"External trade event {name}@{ext}") |                 'name': 'status', | ||||||
|  |                 'resp': status, | ||||||
|  |                 'reqid': reqid,  # brokerd generated order-request id | ||||||
|  |             }: | ||||||
|  |                 if ( | ||||||
|  |                     status != 'open' | ||||||
|  |                 ): | ||||||
|  |                     # TODO: check for an oid we might know since it was | ||||||
|  |                     # registered from a previous order/status load? | ||||||
|  |                     log.error( | ||||||
|  |                         f'Unknown/transient status msg:\n' | ||||||
|  |                         f'{pformat(brokerd_msg)}\n' | ||||||
|  |                         'Unable to relay message to client side!?' | ||||||
|  |                     ) | ||||||
| 
 | 
 | ||||||
|             #     else: |                 # TODO: we probably want some kind of "tagging" system | ||||||
|             #         # something is out of order, we don't have an oid for |                 # for external order submissions like this eventually | ||||||
|             #         # this broker-side message. |                 # to be able to more formally handle multi-player | ||||||
|             #         log.error( |                 # trading... | ||||||
|             #             f'Unknown oid: {oid} for msg {name}:\n' |                 else: | ||||||
|             #             f'{pformat(brokerd_msg)}\n' |                     # existing open backend order which we broadcast to | ||||||
|             #             'Unable to relay message to client side!?' |                     # all currently connected clients. | ||||||
|             #         ) |                     log.info( | ||||||
|  |                         f'Relaying existing open order:\n {brokerd_msg}' | ||||||
|  |                     ) | ||||||
| 
 | 
 | ||||||
|             #     continue |                     # use backend request id as our ems id though this | ||||||
|  |                     # may end up with collisions? | ||||||
|  |                     status_msg = Status(**brokerd_msg) | ||||||
|  |                     order = Order(**status_msg.req) | ||||||
|  |                     assert order.price and order.size | ||||||
|  |                     status_msg.req = order | ||||||
|  | 
 | ||||||
|  |                     assert status_msg.src  # source tag? | ||||||
|  |                     oid = str(status_msg.reqid) | ||||||
|  | 
 | ||||||
|  |                     # attempt to avoid collisions | ||||||
|  |                     status_msg.reqid = oid | ||||||
|  |                     assert status_msg.resp == 'open' | ||||||
|  | 
 | ||||||
|  |                     # register this existing broker-side dialog | ||||||
|  |                     book._ems2brokerd_ids[oid] = reqid | ||||||
|  |                     book._active[oid] = status_msg | ||||||
|  | 
 | ||||||
|  |                     # fan-out-relay position msgs immediately by | ||||||
|  |                     # broadcasting updates on all client streams | ||||||
|  |                     await router.client_broadcast(status_msg) | ||||||
|  | 
 | ||||||
|  |                 # don't fall through | ||||||
|  |                 continue | ||||||
|  | 
 | ||||||
|  |             # brokerd error | ||||||
|  |             case { | ||||||
|  |                 'name': 'status', | ||||||
|  |                 'status': 'error', | ||||||
|  |             }: | ||||||
|  |                 log.error(f'Broker error:\n{pformat(brokerd_msg)}') | ||||||
|  |                 # XXX: we presume the brokerd cancels its own order | ||||||
|  | 
 | ||||||
|  |             # TOO FAST ``BrokerdStatus`` that arrives | ||||||
|  |             # before the ``BrokerdAck``. | ||||||
|  |             case { | ||||||
|  |                 # XXX: sometimes there is a race with the backend (like | ||||||
|  |                 # `ib` where the pending stauts will be related before | ||||||
|  |                 # the ack, in which case we just ignore the faster | ||||||
|  |                 # pending msg and wait for our expected ack to arrive | ||||||
|  |                 # later (i.e. the first block below should enter). | ||||||
|  |                 'name': 'status', | ||||||
|  |                 'status': status, | ||||||
|  |                 'reqid': reqid, | ||||||
|  |             }: | ||||||
|  |                 status_msg = book._active[oid] | ||||||
|  |                 log.warning( | ||||||
|  |                     'Unhandled broker status for dialog:\n' | ||||||
|  |                     f'{pformat(status_msg)}\n' | ||||||
|  |                     f'{pformat(brokerd_msg)}\n' | ||||||
|  |                 ) | ||||||
| 
 | 
 | ||||||
|             case _: |             case _: | ||||||
|                 raise ValueError(f'Brokerd message {brokerd_msg} is invalid') |                 raise ValueError(f'Brokerd message {brokerd_msg} is invalid') | ||||||
| 
 | 
 | ||||||
|         # retrieve existing live flow |         # XXX: ugh sometimes we don't access it? | ||||||
|         entry = book._ems_entries[oid] |         if status_msg: | ||||||
|         assert entry.oid == oid |             del status_msg | ||||||
| 
 |  | ||||||
|         old_reqid = entry.reqid |  | ||||||
|         if old_reqid and old_reqid != reqid: |  | ||||||
|             log.warning( |  | ||||||
|                 f'Brokerd order id change for {oid}:\n' |  | ||||||
|                 f'{old_reqid} -> {reqid}' |  | ||||||
|             ) |  | ||||||
| 
 |  | ||||||
|         # Create and relay response status message |  | ||||||
|         # to requesting EMS client |  | ||||||
|         try: |  | ||||||
|             ems_client_order_stream = router.dialogues[oid] |  | ||||||
|             await ems_client_order_stream.send( |  | ||||||
|                 Status( |  | ||||||
|                     oid=oid, |  | ||||||
|                     resp=resp, |  | ||||||
|                     time_ns=time.time_ns(), |  | ||||||
|                     broker_reqid=reqid, |  | ||||||
|                     brokerd_msg=msg, |  | ||||||
|                 ) |  | ||||||
|             ) |  | ||||||
|         except KeyError: |  | ||||||
|             log.error( |  | ||||||
|                 f'Received `brokerd` msg for unknown client with oid: {oid}') |  | ||||||
| 
 | 
 | ||||||
|     # TODO: do we want this to keep things cleaned up? |     # TODO: do we want this to keep things cleaned up? | ||||||
|     # it might require a special status from brokerd to affirm the |     # it might require a special status from brokerd to affirm the | ||||||
|  | @ -829,27 +901,36 @@ async def process_client_order_cmds( | ||||||
|     async for cmd in client_order_stream: |     async for cmd in client_order_stream: | ||||||
|         log.info(f'Received order cmd:\n{pformat(cmd)}') |         log.info(f'Received order cmd:\n{pformat(cmd)}') | ||||||
| 
 | 
 | ||||||
|         oid = cmd['oid'] |         # CAWT DAMN we need struct support! | ||||||
|  |         oid = str(cmd['oid']) | ||||||
|  | 
 | ||||||
|         # register this stream as an active dialogue for this order id |         # register this stream as an active dialogue for this order id | ||||||
|         # such that translated message from the brokerd backend can be |         # such that translated message from the brokerd backend can be | ||||||
|         # routed (relayed) to **just** that client stream (and in theory |         # routed (relayed) to **just** that client stream (and in theory | ||||||
|         # others who are registered for such order affiliated msgs). |         # others who are registered for such order affiliated msgs). | ||||||
|         client_dialogues[oid] = client_order_stream |         client_dialogues[oid] = client_order_stream | ||||||
|         reqid = dark_book._ems2brokerd_ids.inverse.get(oid) |         reqid = dark_book._ems2brokerd_ids.inverse.get(oid) | ||||||
|         live_entry = dark_book._ems_entries.get(oid) | 
 | ||||||
|  |         # any dark/live status which is current | ||||||
|  |         status = dark_book._active.get(oid) | ||||||
| 
 | 
 | ||||||
|         match cmd: |         match cmd: | ||||||
|             # existing live-broker order cancel |             # existing live-broker order cancel | ||||||
|             case { |             case { | ||||||
|                 'action': 'cancel', |                 'action': 'cancel', | ||||||
|                 'oid': oid, |                 'oid': oid, | ||||||
|             } if live_entry: |             } if ( | ||||||
|                 reqid = live_entry.reqid |                 (status := dark_book._active.get(oid)) | ||||||
|                 msg = BrokerdCancel( |                 and status.resp in ('open', 'pending') | ||||||
|  |             ): | ||||||
|  |                 reqid = status.reqid | ||||||
|  |                 order = status.req | ||||||
|  |                 to_brokerd_msg = BrokerdCancel( | ||||||
|                     oid=oid, |                     oid=oid, | ||||||
|                     reqid=reqid, |                     reqid=reqid, | ||||||
|                     time_ns=time.time_ns(), |                     time_ns=time.time_ns(), | ||||||
|                     account=live_entry.account, |                     # account=live_entry.account, | ||||||
|  |                     account=order.account, | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|                 # NOTE: cancel response will be relayed back in messages |                 # NOTE: cancel response will be relayed back in messages | ||||||
|  | @ -859,39 +940,53 @@ async def process_client_order_cmds( | ||||||
|                     log.info( |                     log.info( | ||||||
|                         f'Submitting cancel for live order {reqid}' |                         f'Submitting cancel for live order {reqid}' | ||||||
|                     ) |                     ) | ||||||
|                     await brokerd_order_stream.send(msg) |                     await brokerd_order_stream.send(to_brokerd_msg) | ||||||
| 
 | 
 | ||||||
|                 else: |                 else: | ||||||
|                     # this might be a cancel for an order that hasn't been |                     # this might be a cancel for an order that hasn't been | ||||||
|                     # acked yet by a brokerd, so register a cancel for when |                     # acked yet by a brokerd, so register a cancel for when | ||||||
|                     # the order ack does show up later such that the brokerd |                     # the order ack does show up later such that the brokerd | ||||||
|                     # order request can be cancelled at that time. |                     # order request can be cancelled at that time. | ||||||
|                     dark_book._ems_entries[oid] = msg |                     # dark_book._ems_entries[oid] = msg | ||||||
|  |                     # special case for now.. | ||||||
|  |                     status.req = to_brokerd_msg | ||||||
| 
 | 
 | ||||||
|             # dark trigger cancel |             # dark trigger cancel | ||||||
|             case { |             case { | ||||||
|                 'action': 'cancel', |                 'action': 'cancel', | ||||||
|                 'oid': oid, |                 'oid': oid, | ||||||
|             } if not live_entry: |             } if ( | ||||||
|                 try: |                 status and status.resp == 'dark_open' | ||||||
|  |                 # or status and status.req | ||||||
|  |             ): | ||||||
|                 # remove from dark book clearing |                 # remove from dark book clearing | ||||||
|                     dark_book.orders[symbol].pop(oid, None) |                 entry = dark_book.orders[symbol].pop(oid, None) | ||||||
|  |                 if entry: | ||||||
|  |                     ( | ||||||
|  |                         pred, | ||||||
|  |                         tickfilter, | ||||||
|  |                         cmd, | ||||||
|  |                         percent_away, | ||||||
|  |                         abs_diff_away | ||||||
|  |                     ) = entry | ||||||
| 
 | 
 | ||||||
|                     # tell client side that we've cancelled the |                     # tell client side that we've cancelled the | ||||||
|                     # dark-trigger order |                     # dark-trigger order | ||||||
|                     await client_order_stream.send( |                     status.resp = 'canceled' | ||||||
|                         Status( |                     status.req = cmd | ||||||
|                             resp='dark_cancelled', | 
 | ||||||
|                             oid=oid, |                     await client_order_stream.send(status) | ||||||
|                             time_ns=time.time_ns(), |  | ||||||
|                         ) |  | ||||||
|                     ) |  | ||||||
|                     # de-register this client dialogue |                     # de-register this client dialogue | ||||||
|                     router.dialogues.pop(oid) |                     router.dialogues.pop(oid) | ||||||
|  |                     dark_book._active.pop(oid) | ||||||
| 
 | 
 | ||||||
|                 except KeyError: |                 else: | ||||||
|                     log.exception(f'No dark order for {symbol}?') |                     log.exception(f'No dark order for {symbol}?') | ||||||
| 
 | 
 | ||||||
|  |             # TODO: eventually we should be receiving | ||||||
|  |             # this struct on the wire unpacked in a scoped protocol | ||||||
|  |             # setup with ``tractor``. | ||||||
|  | 
 | ||||||
|             # live order submission |             # live order submission | ||||||
|             case { |             case { | ||||||
|                 'oid': oid, |                 'oid': oid, | ||||||
|  | @ -899,11 +994,9 @@ async def process_client_order_cmds( | ||||||
|                 'price': trigger_price, |                 'price': trigger_price, | ||||||
|                 'size': size, |                 'size': size, | ||||||
|                 'action': ('buy' | 'sell') as action, |                 'action': ('buy' | 'sell') as action, | ||||||
|                 'exec_mode': 'live', |                 'exec_mode': ('live' | 'paper'), | ||||||
|             }: |             }: | ||||||
|                 # TODO: eventually we should be receiving |                 # TODO: relay this order msg directly? | ||||||
|                 # this struct on the wire unpacked in a scoped protocol |  | ||||||
|                 # setup with ``tractor``. |  | ||||||
|                 req = Order(**cmd) |                 req = Order(**cmd) | ||||||
|                 broker = req.brokers[0] |                 broker = req.brokers[0] | ||||||
| 
 | 
 | ||||||
|  | @ -912,13 +1005,13 @@ async def process_client_order_cmds( | ||||||
|                 # aren't expectig their own name, but should they? |                 # aren't expectig their own name, but should they? | ||||||
|                 sym = fqsn.replace(f'.{broker}', '') |                 sym = fqsn.replace(f'.{broker}', '') | ||||||
| 
 | 
 | ||||||
|                 if live_entry is not None: |                 if status is not None: | ||||||
|                     # sanity check on emsd id |  | ||||||
|                     assert live_entry.oid == oid |  | ||||||
|                     reqid = live_entry.reqid |  | ||||||
|                     # if we already had a broker order id then |                     # if we already had a broker order id then | ||||||
|                     # this is likely an order update commmand. |                     # this is likely an order update commmand. | ||||||
|                     log.info(f"Modifying live {broker} order: {reqid}") |                     log.info(f"Modifying live {broker} order: {reqid}") | ||||||
|  |                     reqid = status.reqid | ||||||
|  |                     status.req = req | ||||||
|  |                     status.resp = 'pending' | ||||||
| 
 | 
 | ||||||
|                 msg = BrokerdOrder( |                 msg = BrokerdOrder( | ||||||
|                     oid=oid,  # no ib support for oids... |                     oid=oid,  # no ib support for oids... | ||||||
|  | @ -935,6 +1028,18 @@ async def process_client_order_cmds( | ||||||
|                     account=req.account, |                     account=req.account, | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|  |                 if status is None: | ||||||
|  |                     status = Status( | ||||||
|  |                         oid=oid, | ||||||
|  |                         reqid=reqid, | ||||||
|  |                         resp='pending', | ||||||
|  |                         time_ns=time.time_ns(), | ||||||
|  |                         brokerd_msg=msg, | ||||||
|  |                         req=req, | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |                 dark_book._active[oid] = status | ||||||
|  | 
 | ||||||
|                 # send request to backend |                 # send request to backend | ||||||
|                 # XXX: the trades data broker response loop |                 # XXX: the trades data broker response loop | ||||||
|                 # (``translate_and_relay_brokerd_events()`` above) will |                 # (``translate_and_relay_brokerd_events()`` above) will | ||||||
|  | @ -950,7 +1055,7 @@ async def process_client_order_cmds( | ||||||
|                 # client, before that ack, when the ack does arrive we |                 # client, before that ack, when the ack does arrive we | ||||||
|                 # immediately take the reqid from the broker and cancel |                 # immediately take the reqid from the broker and cancel | ||||||
|                 # that live order asap. |                 # that live order asap. | ||||||
|                 dark_book._ems_entries[oid] = msg |                 # dark_book._msgflows[oid].maps.insert(0, msg.to_dict()) | ||||||
| 
 | 
 | ||||||
|             # dark-order / alert submission |             # dark-order / alert submission | ||||||
|             case { |             case { | ||||||
|  | @ -966,9 +1071,11 @@ async def process_client_order_cmds( | ||||||
|                     # submit order to local EMS book and scan loop, |                     # submit order to local EMS book and scan loop, | ||||||
|                     # effectively a local clearing engine, which |                     # effectively a local clearing engine, which | ||||||
|                     # scans for conditions and triggers matching executions |                     # scans for conditions and triggers matching executions | ||||||
|                     exec_mode in ('dark', 'paper') |                     exec_mode in ('dark',) | ||||||
|                     or action == 'alert' |                     or action == 'alert' | ||||||
|             ): |             ): | ||||||
|  |                 req = Order(**cmd) | ||||||
|  | 
 | ||||||
|                 # Auto-gen scanner predicate: |                 # Auto-gen scanner predicate: | ||||||
|                 # we automatically figure out what the alert check |                 # we automatically figure out what the alert check | ||||||
|                 # condition should be based on the current first |                 # condition should be based on the current first | ||||||
|  | @ -1015,23 +1122,25 @@ async def process_client_order_cmds( | ||||||
|                 )[oid] = ( |                 )[oid] = ( | ||||||
|                     pred, |                     pred, | ||||||
|                     tickfilter, |                     tickfilter, | ||||||
|                     cmd, |                     req, | ||||||
|                     percent_away, |                     percent_away, | ||||||
|                     abs_diff_away |                     abs_diff_away | ||||||
|                 ) |                 ) | ||||||
|                 resp = 'dark_submitted' |                 resp = 'dark_open' | ||||||
| 
 | 
 | ||||||
|                 # alerts have special msgs to distinguish |                 # alerts have special msgs to distinguish | ||||||
|                 if action == 'alert': |                 # if action == 'alert': | ||||||
|                     resp = 'alert_submitted' |                 #     resp = 'open' | ||||||
| 
 | 
 | ||||||
|                 await client_order_stream.send( |                 status = Status( | ||||||
|                     Status( |  | ||||||
|                     resp=resp, |                     resp=resp, | ||||||
|                     oid=oid, |                     oid=oid, | ||||||
|                     time_ns=time.time_ns(), |                     time_ns=time.time_ns(), | ||||||
|  |                     req=req, | ||||||
|  |                     src='dark', | ||||||
|                 ) |                 ) | ||||||
|                 ) |                 dark_book._active[oid] = status | ||||||
|  |                 await client_order_stream.send(status) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
|  | @ -1099,10 +1208,9 @@ async def _emsd_main( | ||||||
|     ): |     ): | ||||||
| 
 | 
 | ||||||
|         # XXX: this should be initial price quote from target provider |         # XXX: this should be initial price quote from target provider | ||||||
|         first_quote = feed.first_quotes[fqsn] |         first_quote: dict = feed.first_quotes[fqsn] | ||||||
| 
 |         book: _DarkBook = _router.get_dark_book(broker) | ||||||
|         book = _router.get_dark_book(broker) |         book.lasts[fqsn]: float = first_quote['last'] | ||||||
|         book.lasts[fqsn] = first_quote['last'] |  | ||||||
| 
 | 
 | ||||||
|         # open a stream with the brokerd backend for order |         # open a stream with the brokerd backend for order | ||||||
|         # flow dialogue |         # flow dialogue | ||||||
|  | @ -1129,12 +1237,25 @@ async def _emsd_main( | ||||||
|             await ems_ctx.started(( |             await ems_ctx.started(( | ||||||
|                 relay.positions, |                 relay.positions, | ||||||
|                 list(relay.accounts), |                 list(relay.accounts), | ||||||
|  |                 book._active, | ||||||
|             )) |             )) | ||||||
| 
 | 
 | ||||||
|             # establish 2-way stream with requesting order-client and |             # establish 2-way stream with requesting order-client and | ||||||
|             # begin handling inbound order requests and updates |             # begin handling inbound order requests and updates | ||||||
|             async with ems_ctx.open_stream() as ems_client_order_stream: |             async with ems_ctx.open_stream() as ems_client_order_stream: | ||||||
| 
 | 
 | ||||||
|  |                 # register the client side before startingn the | ||||||
|  |                 # brokerd-side relay task to ensure the client is | ||||||
|  |                 # delivered all exisiting open orders on startup. | ||||||
|  |                 _router.clients.add(ems_client_order_stream) | ||||||
|  | 
 | ||||||
|  |                 n.start_soon( | ||||||
|  |                     translate_and_relay_brokerd_events, | ||||||
|  |                     broker, | ||||||
|  |                     brokerd_stream, | ||||||
|  |                     _router, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|                 # trigger scan and exec loop |                 # trigger scan and exec loop | ||||||
|                 n.start_soon( |                 n.start_soon( | ||||||
|                     clear_dark_triggers, |                     clear_dark_triggers, | ||||||
|  | @ -1149,7 +1270,6 @@ async def _emsd_main( | ||||||
| 
 | 
 | ||||||
|                 # start inbound (from attached client) order request processing |                 # start inbound (from attached client) order request processing | ||||||
|                 try: |                 try: | ||||||
|                     _router.clients.add(ems_client_order_stream) |  | ||||||
| 
 | 
 | ||||||
|                     # main entrypoint, run here until cancelled. |                     # main entrypoint, run here until cancelled. | ||||||
|                     await process_client_order_cmds( |                     await process_client_order_cmds( | ||||||
|  |  | ||||||
|  | @ -18,24 +18,92 @@ | ||||||
| Clearing sub-system message and protocols. | Clearing sub-system message and protocols. | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
| from typing import Optional, Union | # from collections import ( | ||||||
|  | #     ChainMap, | ||||||
|  | #     deque, | ||||||
|  | # ) | ||||||
|  | from typing import ( | ||||||
|  |     Optional, | ||||||
|  |     Literal, | ||||||
|  | ) | ||||||
| 
 | 
 | ||||||
| from ..data._source import Symbol | from ..data._source import Symbol | ||||||
| from ..data.types import Struct | from ..data.types import Struct | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # TODO: a composite for tracking msg flow on 2-legged | ||||||
|  | # dialogs. | ||||||
|  | # class Dialog(ChainMap): | ||||||
|  | #     ''' | ||||||
|  | #     Msg collection abstraction to easily track the state changes of | ||||||
|  | #     a msg flow in one high level, query-able and immutable construct. | ||||||
|  | 
 | ||||||
|  | #     The main use case is to query data from a (long-running) | ||||||
|  | #     msg-transaction-sequence | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | #     ''' | ||||||
|  | #     def update( | ||||||
|  | #         self, | ||||||
|  | #         msg, | ||||||
|  | #     ) -> None: | ||||||
|  | #         self.maps.insert(0, msg.to_dict()) | ||||||
|  | 
 | ||||||
|  | #     def flatten(self) -> dict: | ||||||
|  | #         return dict(self) | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| # TODO: ``msgspec`` stuff worth paying attention to: | # TODO: ``msgspec`` stuff worth paying attention to: | ||||||
| # - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution | # - schema evolution: | ||||||
|  | # https://jcristharif.com/msgspec/usage.html#schema-evolution | ||||||
|  | # - for eg. ``BrokerdStatus``, instead just have separate messages? | ||||||
| # - use literals for a common msg determined by diff keys? | # - use literals for a common msg determined by diff keys? | ||||||
| #   - https://jcristharif.com/msgspec/usage.html#literal | #   - https://jcristharif.com/msgspec/usage.html#literal | ||||||
| #   - for eg. ``BrokerdStatus``, instead just have separate messages? |  | ||||||
| 
 | 
 | ||||||
| # -------------- | # -------------- | ||||||
| # Client -> emsd | # Client -> emsd | ||||||
| # -------------- | # -------------- | ||||||
| 
 | 
 | ||||||
|  | class Order(Struct): | ||||||
|  | 
 | ||||||
|  |     # TODO: ideally we can combine these 2 fields into | ||||||
|  |     # 1 and just use the size polarity to determine a buy/sell. | ||||||
|  |     # i would like to see this become more like | ||||||
|  |     # https://jcristharif.com/msgspec/usage.html#literal | ||||||
|  |     # action: Literal[ | ||||||
|  |     #     'live', | ||||||
|  |     #     'dark', | ||||||
|  |     #     'alert', | ||||||
|  |     # ] | ||||||
|  | 
 | ||||||
|  |     action: Literal[ | ||||||
|  |         'buy', | ||||||
|  |         'sell', | ||||||
|  |         'alert', | ||||||
|  |     ] | ||||||
|  |     # determines whether the create execution | ||||||
|  |     # will be submitted to the ems or directly to | ||||||
|  |     # the backend broker | ||||||
|  |     exec_mode: Literal[ | ||||||
|  |         'dark', | ||||||
|  |         'live', | ||||||
|  |         # 'paper',  no right? | ||||||
|  |     ] | ||||||
|  | 
 | ||||||
|  |     # internal ``emdsd`` unique "order id" | ||||||
|  |     oid: str  # uuid4 | ||||||
|  |     symbol: str | Symbol | ||||||
|  |     account: str  # should we set a default as '' ? | ||||||
|  | 
 | ||||||
|  |     price: float | ||||||
|  |     size: float  # -ve is "sell", +ve is "buy" | ||||||
|  | 
 | ||||||
|  |     brokers: Optional[list[str]] = [] | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| class Cancel(Struct): | class Cancel(Struct): | ||||||
|     '''Cancel msg for removing a dark (ems triggered) or |     ''' | ||||||
|  |     Cancel msg for removing a dark (ems triggered) or | ||||||
|     broker-submitted (live) trigger/order. |     broker-submitted (live) trigger/order. | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|  | @ -44,32 +112,6 @@ class Cancel(Struct): | ||||||
|     symbol: str |     symbol: str | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class Order(Struct): |  | ||||||
| 
 |  | ||||||
|     # TODO: use ``msgspec.Literal`` |  | ||||||
|     # https://jcristharif.com/msgspec/usage.html#literal |  | ||||||
|     action: str  # {'buy', 'sell', 'alert'} |  | ||||||
|     # internal ``emdsd`` unique "order id" |  | ||||||
|     oid: str  # uuid4 |  | ||||||
|     symbol: Union[str, Symbol] |  | ||||||
|     account: str  # should we set a default as '' ? |  | ||||||
| 
 |  | ||||||
|     price: float |  | ||||||
|     # TODO: could we drop the ``.action`` field above and instead just |  | ||||||
|     # use +/- values here? Would make the msg smaller at the sake of a |  | ||||||
|     # teensie fp precision? |  | ||||||
|     size: float |  | ||||||
|     brokers: list[str] |  | ||||||
| 
 |  | ||||||
|     # Assigned once initial ack is received |  | ||||||
|     # ack_time_ns: Optional[int] = None |  | ||||||
| 
 |  | ||||||
|     # determines whether the create execution |  | ||||||
|     # will be submitted to the ems or directly to |  | ||||||
|     # the backend broker |  | ||||||
|     exec_mode: str  # {'dark', 'live', 'paper'} |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
| # -------------- | # -------------- | ||||||
| # Client <- emsd | # Client <- emsd | ||||||
| # -------------- | # -------------- | ||||||
|  | @ -79,37 +121,39 @@ class Order(Struct): | ||||||
| class Status(Struct): | class Status(Struct): | ||||||
| 
 | 
 | ||||||
|     name: str = 'status' |     name: str = 'status' | ||||||
|     oid: str  # uuid4 |  | ||||||
|     time_ns: int |     time_ns: int | ||||||
|  |     oid: str  # uuid4 ems-order dialog id | ||||||
| 
 | 
 | ||||||
|     # { |     resp: Literal[ | ||||||
|     #   'dark_submitted', |       'pending',  # acked by broker but not yet open | ||||||
|     #   'dark_cancelled', |       'open', | ||||||
|     #   'dark_triggered', |       'dark_open',  # dark/algo triggered order is open in ems clearing loop | ||||||
| 
 |       'triggered',  # above triggered order sent to brokerd, or an alert closed | ||||||
|     #   'broker_submitted', |       'closed',  # fully cleared all size/units | ||||||
|     #   'broker_cancelled', |       'fill',  # partial execution | ||||||
|     #   'broker_executed', |       'canceled', | ||||||
|     #   'broker_filled', |       'error', | ||||||
|     #   'broker_errored', |     ] | ||||||
| 
 |  | ||||||
|     #   'alert_submitted', |  | ||||||
|     #   'alert_triggered', |  | ||||||
| 
 |  | ||||||
|     # } |  | ||||||
|     resp: str  # "response", see above |  | ||||||
| 
 |  | ||||||
|     # trigger info |  | ||||||
|     trigger_price: Optional[float] = None |  | ||||||
|     # price: float |  | ||||||
| 
 |  | ||||||
|     # broker: Optional[str] = None |  | ||||||
| 
 | 
 | ||||||
|     # this maps normally to the ``BrokerdOrder.reqid`` below, an id |     # this maps normally to the ``BrokerdOrder.reqid`` below, an id | ||||||
|     # normally allocated internally by the backend broker routing system |     # normally allocated internally by the backend broker routing system | ||||||
|     broker_reqid: Optional[Union[int, str]] = None |     reqid: Optional[int | str] = None | ||||||
| 
 | 
 | ||||||
|     # for relaying backend msg data "through" the ems layer |     # the (last) source order/request msg if provided | ||||||
|  |     # (eg. the Order/Cancel which causes this msg) and | ||||||
|  |     # acts as a back-reference to the corresponding | ||||||
|  |     # request message which was the source of this msg. | ||||||
|  |     req: Optional[Order | Cancel] = None | ||||||
|  | 
 | ||||||
|  |     # XXX: better design/name here? | ||||||
|  |     # flag that can be set to indicate a message for an order | ||||||
|  |     # event that wasn't originated by piker's emsd (eg. some external | ||||||
|  |     # trading system which does it's own order control but that you | ||||||
|  |     # might want to "track" using piker UIs/systems). | ||||||
|  |     src: Optional[str] = None | ||||||
|  | 
 | ||||||
|  |     # for relaying a boxed brokerd-dialog-side msg data "through" the | ||||||
|  |     # ems layer to clients. | ||||||
|     brokerd_msg: dict = {} |     brokerd_msg: dict = {} | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -131,25 +175,28 @@ class BrokerdCancel(Struct): | ||||||
|     # for setting a unique order id then this value will be relayed back |     # for setting a unique order id then this value will be relayed back | ||||||
|     # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` |     # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` | ||||||
|     # field |     # field | ||||||
|     reqid: Optional[Union[int, str]] = None |     reqid: Optional[int | str] = None | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class BrokerdOrder(Struct): | class BrokerdOrder(Struct): | ||||||
| 
 | 
 | ||||||
|     action: str  # {buy, sell} |  | ||||||
|     oid: str |     oid: str | ||||||
|     account: str |     account: str | ||||||
|     time_ns: int |     time_ns: int | ||||||
| 
 | 
 | ||||||
|  |     # TODO: if we instead rely on a +ve/-ve size to determine | ||||||
|  |     # the action we more or less don't need this field right? | ||||||
|  |     action: str = ''  # {buy, sell} | ||||||
|  | 
 | ||||||
|     # "broker request id": broker specific/internal order id if this is |     # "broker request id": broker specific/internal order id if this is | ||||||
|     # None, creates a new order otherwise if the id is valid the backend |     # None, creates a new order otherwise if the id is valid the backend | ||||||
|     # api must modify the existing matching order. If the broker allows |     # api must modify the existing matching order. If the broker allows | ||||||
|     # for setting a unique order id then this value will be relayed back |     # for setting a unique order id then this value will be relayed back | ||||||
|     # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` |     # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` | ||||||
|     # field |     # field | ||||||
|     reqid: Optional[Union[int, str]] = None |     reqid: Optional[int | str] = None | ||||||
| 
 | 
 | ||||||
|     symbol: str  # symbol.<providername> ? |     symbol: str  # fqsn | ||||||
|     price: float |     price: float | ||||||
|     size: float |     size: float | ||||||
| 
 | 
 | ||||||
|  | @ -170,7 +217,7 @@ class BrokerdOrderAck(Struct): | ||||||
|     name: str = 'ack' |     name: str = 'ack' | ||||||
| 
 | 
 | ||||||
|     # defined and provided by backend |     # defined and provided by backend | ||||||
|     reqid: Union[int, str] |     reqid: int | str | ||||||
| 
 | 
 | ||||||
|     # emsd id originally sent in matching request msg |     # emsd id originally sent in matching request msg | ||||||
|     oid: str |     oid: str | ||||||
|  | @ -180,30 +227,22 @@ class BrokerdOrderAck(Struct): | ||||||
| class BrokerdStatus(Struct): | class BrokerdStatus(Struct): | ||||||
| 
 | 
 | ||||||
|     name: str = 'status' |     name: str = 'status' | ||||||
|     reqid: Union[int, str] |     reqid: int | str | ||||||
|     time_ns: int |     time_ns: int | ||||||
|  |     status: Literal[ | ||||||
|  |         'open', | ||||||
|  |         'canceled', | ||||||
|  |         'fill', | ||||||
|  |         'pending', | ||||||
|  |         'error', | ||||||
|  |     ] | ||||||
| 
 | 
 | ||||||
|     # XXX: should be best effort set for every update |     account: str | ||||||
|     account: str = '' |  | ||||||
| 
 |  | ||||||
|     # TODO: instead (ack, pending, open, fill, clos(ed), cancelled) |  | ||||||
|     # { |  | ||||||
|     #   'submitted', |  | ||||||
|     #   'cancelled', |  | ||||||
|     #   'filled', |  | ||||||
|     # } |  | ||||||
|     status: str |  | ||||||
| 
 |  | ||||||
|     filled: float = 0.0 |     filled: float = 0.0 | ||||||
|     reason: str = '' |     reason: str = '' | ||||||
|     remaining: float = 0.0 |     remaining: float = 0.0 | ||||||
| 
 | 
 | ||||||
|     # XXX: better design/name here? |     # external: bool = False | ||||||
|     # flag that can be set to indicate a message for an order |  | ||||||
|     # event that wasn't originated by piker's emsd (eg. some external |  | ||||||
|     # trading system which does it's own order control but that you |  | ||||||
|     # might want to "track" using piker UIs/systems). |  | ||||||
|     external: bool = False |  | ||||||
| 
 | 
 | ||||||
|     # XXX: not required schema as of yet |     # XXX: not required schema as of yet | ||||||
|     broker_details: dict = { |     broker_details: dict = { | ||||||
|  | @ -218,7 +257,7 @@ class BrokerdFill(Struct): | ||||||
| 
 | 
 | ||||||
|     ''' |     ''' | ||||||
|     name: str = 'fill' |     name: str = 'fill' | ||||||
|     reqid: Union[int, str] |     reqid: int | str | ||||||
|     time_ns: int |     time_ns: int | ||||||
| 
 | 
 | ||||||
|     # order exeuction related |     # order exeuction related | ||||||
|  | @ -248,7 +287,7 @@ class BrokerdError(Struct): | ||||||
| 
 | 
 | ||||||
|     # if no brokerd order request was actually submitted (eg. we errored |     # if no brokerd order request was actually submitted (eg. we errored | ||||||
|     # at the ``pikerd`` layer) then there will be ``reqid`` allocated. |     # at the ``pikerd`` layer) then there will be ``reqid`` allocated. | ||||||
|     reqid: Optional[Union[int, str]] = None |     reqid: Optional[int | str] = None | ||||||
| 
 | 
 | ||||||
|     symbol: str |     symbol: str | ||||||
|     reason: str |     reason: str | ||||||
|  |  | ||||||
|  | @ -45,8 +45,13 @@ from ..data._normalize import iterticks | ||||||
| from ..data._source import unpack_fqsn | from ..data._source import unpack_fqsn | ||||||
| from ..log import get_logger | from ..log import get_logger | ||||||
| from ._messages import ( | from ._messages import ( | ||||||
|     BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, |     BrokerdCancel, | ||||||
|     BrokerdFill, BrokerdPosition, BrokerdError |     BrokerdOrder, | ||||||
|  |     BrokerdOrderAck, | ||||||
|  |     BrokerdStatus, | ||||||
|  |     BrokerdFill, | ||||||
|  |     BrokerdPosition, | ||||||
|  |     BrokerdError, | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | @ -94,6 +99,10 @@ class PaperBoi: | ||||||
|         ''' |         ''' | ||||||
|         is_modify: bool = False |         is_modify: bool = False | ||||||
| 
 | 
 | ||||||
|  |         if action == 'alert': | ||||||
|  |             # bypass all fill simulation | ||||||
|  |             return reqid | ||||||
|  | 
 | ||||||
|         entry = self._reqids.get(reqid) |         entry = self._reqids.get(reqid) | ||||||
|         if entry: |         if entry: | ||||||
|             # order is already existing, this is a modify |             # order is already existing, this is a modify | ||||||
|  | @ -104,10 +113,6 @@ class PaperBoi: | ||||||
|             # register order internally |             # register order internally | ||||||
|             self._reqids[reqid] = (oid, symbol, action, price) |             self._reqids[reqid] = (oid, symbol, action, price) | ||||||
| 
 | 
 | ||||||
|         if action == 'alert': |  | ||||||
|             # bypass all fill simulation |  | ||||||
|             return reqid |  | ||||||
| 
 |  | ||||||
|         # TODO: net latency model |         # TODO: net latency model | ||||||
|         # we checkpoint here quickly particulalry |         # we checkpoint here quickly particulalry | ||||||
|         # for dark orders since we want the dark_executed |         # for dark orders since we want the dark_executed | ||||||
|  | @ -119,7 +124,9 @@ class PaperBoi: | ||||||
|             size = -size |             size = -size | ||||||
| 
 | 
 | ||||||
|         msg = BrokerdStatus( |         msg = BrokerdStatus( | ||||||
|             status='submitted', |             status='open', | ||||||
|  |             # account=f'paper_{self.broker}', | ||||||
|  |             account='paper', | ||||||
|             reqid=reqid, |             reqid=reqid, | ||||||
|             time_ns=time.time_ns(), |             time_ns=time.time_ns(), | ||||||
|             filled=0.0, |             filled=0.0, | ||||||
|  | @ -136,7 +143,14 @@ class PaperBoi: | ||||||
|             ) or ( |             ) or ( | ||||||
|             action == 'sell' and (clear_price := self.last_bid[0]) >= price |             action == 'sell' and (clear_price := self.last_bid[0]) >= price | ||||||
|         ): |         ): | ||||||
|             await self.fake_fill(symbol, clear_price, size, action, reqid, oid) |             await self.fake_fill( | ||||||
|  |                 symbol, | ||||||
|  |                 clear_price, | ||||||
|  |                 size, | ||||||
|  |                 action, | ||||||
|  |                 reqid, | ||||||
|  |                 oid, | ||||||
|  |             ) | ||||||
| 
 | 
 | ||||||
|         else: |         else: | ||||||
|             # register this submissions as a paper live order |             # register this submissions as a paper live order | ||||||
|  | @ -178,7 +192,9 @@ class PaperBoi: | ||||||
|         await trio.sleep(0.05) |         await trio.sleep(0.05) | ||||||
| 
 | 
 | ||||||
|         msg = BrokerdStatus( |         msg = BrokerdStatus( | ||||||
|             status='cancelled', |             status='canceled', | ||||||
|  |             # account=f'paper_{self.broker}', | ||||||
|  |             account='paper', | ||||||
|             reqid=reqid, |             reqid=reqid, | ||||||
|             time_ns=time.time_ns(), |             time_ns=time.time_ns(), | ||||||
|             broker_details={'name': 'paperboi'}, |             broker_details={'name': 'paperboi'}, | ||||||
|  | @ -230,25 +246,23 @@ class PaperBoi: | ||||||
|         self._trade_ledger.update(fill_msg.to_dict()) |         self._trade_ledger.update(fill_msg.to_dict()) | ||||||
| 
 | 
 | ||||||
|         if order_complete: |         if order_complete: | ||||||
| 
 |  | ||||||
|             msg = BrokerdStatus( |             msg = BrokerdStatus( | ||||||
| 
 |  | ||||||
|                 reqid=reqid, |                 reqid=reqid, | ||||||
|                 time_ns=time.time_ns(), |                 time_ns=time.time_ns(), | ||||||
| 
 |                 # account=f'paper_{self.broker}', | ||||||
|                 status='filled', |                 account='paper', | ||||||
|  |                 status='closed', | ||||||
|                 filled=size, |                 filled=size, | ||||||
|                 remaining=0 if order_complete else remaining, |                 remaining=0 if order_complete else remaining, | ||||||
| 
 |                 # broker_details={ | ||||||
|                 broker_details={ |                 #     'paper_info': { | ||||||
|                     'paper_info': { |                 #         'oid': oid, | ||||||
|                         'oid': oid, |                 #     }, | ||||||
|                     }, |                 #     'action': action, | ||||||
|                     'action': action, |                 #     'size': size, | ||||||
|                     'size': size, |                 #     'price': price, | ||||||
|                     'price': price, |                 #     'name': self.broker, | ||||||
|                     'name': self.broker, |                 # }, | ||||||
|                 }, |  | ||||||
|             ) |             ) | ||||||
|             await self.ems_trades_stream.send(msg) |             await self.ems_trades_stream.send(msg) | ||||||
| 
 | 
 | ||||||
|  | @ -393,30 +407,33 @@ async def handle_order_requests( | ||||||
|     # order_request: dict |     # order_request: dict | ||||||
|     async for request_msg in ems_order_stream: |     async for request_msg in ems_order_stream: | ||||||
| 
 | 
 | ||||||
|         action = request_msg['action'] |         # action = request_msg['action'] | ||||||
| 
 |         match request_msg: | ||||||
|         if action in {'buy', 'sell'}: |         # if action in {'buy', 'sell'}: | ||||||
| 
 |             case {'action': ('buy' | 'sell')}: | ||||||
|             account = request_msg['account'] |                 order = BrokerdOrder(**request_msg) | ||||||
|  |                 account = order.account | ||||||
|                 if account != 'paper': |                 if account != 'paper': | ||||||
|                     log.error( |                     log.error( | ||||||
|                         'This is a paper account,' |                         'This is a paper account,' | ||||||
|                         ' only a `paper` selection is valid' |                         ' only a `paper` selection is valid' | ||||||
|                     ) |                     ) | ||||||
|                     await ems_order_stream.send(BrokerdError( |                     await ems_order_stream.send(BrokerdError( | ||||||
|                     oid=request_msg['oid'], |                         # oid=request_msg['oid'], | ||||||
|                     symbol=request_msg['symbol'], |                         oid=order.oid, | ||||||
|  |                         # symbol=request_msg['symbol'], | ||||||
|  |                         symbol=order.symbol, | ||||||
|                         reason=f'Paper only. No account found: `{account}` ?', |                         reason=f'Paper only. No account found: `{account}` ?', | ||||||
|                     )) |                     )) | ||||||
|                     continue |                     continue | ||||||
| 
 | 
 | ||||||
|             # validate |             # validate | ||||||
|             order = BrokerdOrder(**request_msg) |             # order = BrokerdOrder(**request_msg) | ||||||
| 
 | 
 | ||||||
|             if order.reqid is None: |                 # if order.reqid is None: | ||||||
|                 reqid = str(uuid.uuid4()) |                 #     reqid =  | ||||||
|             else: |                 # else: | ||||||
|                 reqid = order.reqid |                 reqid = order.reqid or str(uuid.uuid4()) | ||||||
| 
 | 
 | ||||||
|                 # deliver ack that order has been submitted to broker routing |                 # deliver ack that order has been submitted to broker routing | ||||||
|                 await ems_order_stream.send( |                 await ems_order_stream.send( | ||||||
|  | @ -447,14 +464,14 @@ async def handle_order_requests( | ||||||
|                     reqid=reqid, |                     reqid=reqid, | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|         elif action == 'cancel': |             # elif action == 'cancel': | ||||||
|  |             case {'action': 'cancel'}: | ||||||
|                 msg = BrokerdCancel(**request_msg) |                 msg = BrokerdCancel(**request_msg) | ||||||
| 
 |  | ||||||
|                 await client.submit_cancel( |                 await client.submit_cancel( | ||||||
|                     reqid=msg.reqid |                     reqid=msg.reqid | ||||||
|                 ) |                 ) | ||||||
| 
 | 
 | ||||||
|         else: |             case _: | ||||||
|                 log.error(f'Unknown order command: {request_msg}') |                 log.error(f'Unknown order command: {request_msg}') | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -37,7 +37,7 @@ import time | ||||||
| from math import isnan | from math import isnan | ||||||
| 
 | 
 | ||||||
| from bidict import bidict | from bidict import bidict | ||||||
| import msgpack | from msgspec.msgpack import encode, decode | ||||||
| import pyqtgraph as pg | import pyqtgraph as pg | ||||||
| import numpy as np | import numpy as np | ||||||
| import tractor | import tractor | ||||||
|  | @ -774,12 +774,13 @@ async def stream_quotes( | ||||||
|     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: |     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: | ||||||
|         # send subs topics to server |         # send subs topics to server | ||||||
|         resp = await ws.send_message( |         resp = await ws.send_message( | ||||||
|             msgpack.dumps({'streams': list(tbks.values())}) | 
 | ||||||
|  |             encode({'streams': list(tbks.values())}) | ||||||
|         ) |         ) | ||||||
|         log.info(resp) |         log.info(resp) | ||||||
| 
 | 
 | ||||||
|         async def recv() -> dict[str, Any]: |         async def recv() -> dict[str, Any]: | ||||||
|             return msgpack.loads((await ws.get_message()), encoding='utf-8') |             return decode((await ws.get_message()), encoding='utf-8') | ||||||
| 
 | 
 | ||||||
|         streams = (await recv())['streams'] |         streams = (await recv())['streams'] | ||||||
|         log.info(f"Subscribed to {streams}") |         log.info(f"Subscribed to {streams}") | ||||||
|  |  | ||||||
|  | @ -18,6 +18,7 @@ | ||||||
| Built-in (extension) types. | Built-in (extension) types. | ||||||
| 
 | 
 | ||||||
| """ | """ | ||||||
|  | import sys | ||||||
| from typing import Optional | from typing import Optional | ||||||
| from pprint import pformat | from pprint import pformat | ||||||
| 
 | 
 | ||||||
|  | @ -42,8 +43,13 @@ class Struct( | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|     def __repr__(self): |     def __repr__(self): | ||||||
|  |         # only turn on pprint when we detect a python REPL | ||||||
|  |         # at runtime B) | ||||||
|  |         if hasattr(sys, 'ps1'): | ||||||
|             return f'Struct({pformat(self.to_dict())})' |             return f'Struct({pformat(self.to_dict())})' | ||||||
| 
 | 
 | ||||||
|  |         return super().__repr__() | ||||||
|  | 
 | ||||||
|     def copy( |     def copy( | ||||||
|         self, |         self, | ||||||
|         update: Optional[dict] = None, |         update: Optional[dict] = None, | ||||||
|  |  | ||||||
							
								
								
									
										19
									
								
								piker/pp.py
								
								
								
								
							
							
						
						
									
										19
									
								
								piker/pp.py
								
								
								
								
							|  | @ -134,6 +134,8 @@ class Position(Struct): | ||||||
|     # unique backend symbol id |     # unique backend symbol id | ||||||
|     bsuid: str |     bsuid: str | ||||||
| 
 | 
 | ||||||
|  |     split_ratio: Optional[int] = None | ||||||
|  | 
 | ||||||
|     # ordered record of known constituent trade messages |     # ordered record of known constituent trade messages | ||||||
|     clears: dict[ |     clears: dict[ | ||||||
|         Union[str, int, Status],  # trade id |         Union[str, int, Status],  # trade id | ||||||
|  | @ -159,6 +161,9 @@ class Position(Struct): | ||||||
|         clears = d.pop('clears') |         clears = d.pop('clears') | ||||||
|         expiry = d.pop('expiry') |         expiry = d.pop('expiry') | ||||||
| 
 | 
 | ||||||
|  |         if self.split_ratio is None: | ||||||
|  |             d.pop('split_ratio') | ||||||
|  | 
 | ||||||
|         # TODO: we need to figure out how to have one top level |         # TODO: we need to figure out how to have one top level | ||||||
|         # listing venue here even when the backend isn't providing |         # listing venue here even when the backend isn't providing | ||||||
|         # it via the trades ledger.. |         # it via the trades ledger.. | ||||||
|  | @ -384,12 +389,22 @@ class Position(Struct): | ||||||
|                 asize_h.append(accum_size) |                 asize_h.append(accum_size) | ||||||
|                 ppu_h.append(ppu_h[-1]) |                 ppu_h.append(ppu_h[-1]) | ||||||
| 
 | 
 | ||||||
|         return ppu_h[-1] if ppu_h else 0 |         final_ppu = ppu_h[-1] if ppu_h else 0 | ||||||
|  | 
 | ||||||
|  |         # handle any split info entered (for now) manually by user | ||||||
|  |         if self.split_ratio is not None: | ||||||
|  |             final_ppu /= self.split_ratio | ||||||
|  | 
 | ||||||
|  |         return final_ppu | ||||||
| 
 | 
 | ||||||
|     def calc_size(self) -> float: |     def calc_size(self) -> float: | ||||||
|         size: float = 0 |         size: float = 0 | ||||||
|         for tid, entry in self.clears.items(): |         for tid, entry in self.clears.items(): | ||||||
|             size += entry['size'] |             size += entry['size'] | ||||||
|  | 
 | ||||||
|  |         if self.split_ratio is not None: | ||||||
|  |             size = round(size * self.split_ratio) | ||||||
|  | 
 | ||||||
|         return size |         return size | ||||||
| 
 | 
 | ||||||
|     def minimize_clears( |     def minimize_clears( | ||||||
|  | @ -848,6 +863,7 @@ def open_pps( | ||||||
|         size = entry['size'] |         size = entry['size'] | ||||||
|         # TODO: remove but, handle old field name for now |         # TODO: remove but, handle old field name for now | ||||||
|         ppu = entry.get('ppu', entry.get('be_price', 0)) |         ppu = entry.get('ppu', entry.get('be_price', 0)) | ||||||
|  |         split_ratio = entry.get('split_ratio') | ||||||
| 
 | 
 | ||||||
|         expiry = entry.get('expiry') |         expiry = entry.get('expiry') | ||||||
|         if expiry: |         if expiry: | ||||||
|  | @ -857,6 +873,7 @@ def open_pps( | ||||||
|             Symbol.from_fqsn(fqsn, info={}), |             Symbol.from_fqsn(fqsn, info={}), | ||||||
|             size=size, |             size=size, | ||||||
|             ppu=ppu, |             ppu=ppu, | ||||||
|  |             split_ratio=split_ratio, | ||||||
|             expiry=expiry, |             expiry=expiry, | ||||||
|             bsuid=entry['bsuid'], |             bsuid=entry['bsuid'], | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -140,9 +140,9 @@ class LineEditor: | ||||||
| 
 | 
 | ||||||
|     ) -> LevelLine: |     ) -> LevelLine: | ||||||
| 
 | 
 | ||||||
|         staged_line = self._active_staged_line |         # staged_line = self._active_staged_line | ||||||
|         if not staged_line: |         # if not staged_line: | ||||||
|             raise RuntimeError("No line is currently staged!?") |         #     raise RuntimeError("No line is currently staged!?") | ||||||
| 
 | 
 | ||||||
|         # for now, until submission reponse arrives |         # for now, until submission reponse arrives | ||||||
|         line.hide_labels() |         line.hide_labels() | ||||||
|  |  | ||||||
|  | @ -49,16 +49,21 @@ from ._position import ( | ||||||
|     SettingsPane, |     SettingsPane, | ||||||
| ) | ) | ||||||
| from ._forms import FieldsForm | from ._forms import FieldsForm | ||||||
| # from ._label import FormatLabel |  | ||||||
| from ._window import MultiStatus | from ._window import MultiStatus | ||||||
| from ..clearing._messages import Order, BrokerdPosition | from ..clearing._messages import ( | ||||||
|  |     Order, | ||||||
|  |     Status, | ||||||
|  |     # BrokerdOrder, | ||||||
|  |     # BrokerdStatus, | ||||||
|  |     BrokerdPosition, | ||||||
|  | ) | ||||||
| from ._forms import open_form_input_handling | from ._forms import open_form_input_handling | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| log = get_logger(__name__) | log = get_logger(__name__) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class OrderDialog(Struct): | class Dialog(Struct): | ||||||
|     ''' |     ''' | ||||||
|     Trade dialogue meta-data describing the lifetime |     Trade dialogue meta-data describing the lifetime | ||||||
|     of an order submission to ``emsd`` from a chart. |     of an order submission to ``emsd`` from a chart. | ||||||
|  | @ -141,7 +146,7 @@ class OrderMode: | ||||||
|     current_pp: Optional[PositionTracker] = None |     current_pp: Optional[PositionTracker] = None | ||||||
|     active: bool = False |     active: bool = False | ||||||
|     name: str = 'order' |     name: str = 'order' | ||||||
|     dialogs: dict[str, OrderDialog] = field(default_factory=dict) |     dialogs: dict[str, Dialog] = field(default_factory=dict) | ||||||
| 
 | 
 | ||||||
|     _colors = { |     _colors = { | ||||||
|         'alert': 'alert_yellow', |         'alert': 'alert_yellow', | ||||||
|  | @ -152,10 +157,7 @@ class OrderMode: | ||||||
| 
 | 
 | ||||||
|     def line_from_order( |     def line_from_order( | ||||||
|         self, |         self, | ||||||
| 
 |  | ||||||
|         order: Order, |         order: Order, | ||||||
|         symbol: Symbol, |  | ||||||
| 
 |  | ||||||
|         **line_kwargs, |         **line_kwargs, | ||||||
| 
 | 
 | ||||||
|     ) -> LevelLine: |     ) -> LevelLine: | ||||||
|  | @ -173,8 +175,8 @@ class OrderMode: | ||||||
|             color=self._colors[order.action], |             color=self._colors[order.action], | ||||||
| 
 | 
 | ||||||
|             dotted=True if ( |             dotted=True if ( | ||||||
|                 order.exec_mode == 'dark' and |                 order.exec_mode == 'dark' | ||||||
|                 order.action != 'alert' |                 and order.action != 'alert' | ||||||
|             ) else False, |             ) else False, | ||||||
| 
 | 
 | ||||||
|             **line_kwargs, |             **line_kwargs, | ||||||
|  | @ -236,7 +238,6 @@ class OrderMode: | ||||||
| 
 | 
 | ||||||
|         line = self.line_from_order( |         line = self.line_from_order( | ||||||
|             order, |             order, | ||||||
|             symbol, |  | ||||||
| 
 | 
 | ||||||
|             show_markers=True, |             show_markers=True, | ||||||
|             # just for the stage line to avoid |             # just for the stage line to avoid | ||||||
|  | @ -262,25 +263,28 @@ class OrderMode: | ||||||
| 
 | 
 | ||||||
|     def submit_order( |     def submit_order( | ||||||
|         self, |         self, | ||||||
|  |         send_msg: bool = True, | ||||||
|  |         order: Optional[Order] = None, | ||||||
| 
 | 
 | ||||||
|     ) -> OrderDialog: |     ) -> Dialog: | ||||||
|         ''' |         ''' | ||||||
|         Send execution order to EMS return a level line to |         Send execution order to EMS return a level line to | ||||||
|         represent the order on a chart. |         represent the order on a chart. | ||||||
| 
 | 
 | ||||||
|         ''' |         ''' | ||||||
|  |         if not order: | ||||||
|             staged = self._staged_order |             staged = self._staged_order | ||||||
|         symbol: Symbol = staged.symbol |  | ||||||
|             oid = str(uuid.uuid4()) |             oid = str(uuid.uuid4()) | ||||||
|  |             # symbol: Symbol = staged.symbol | ||||||
| 
 | 
 | ||||||
|             # format order data for ems |             # format order data for ems | ||||||
|             order = staged.copy() |             order = staged.copy() | ||||||
|             order.oid = oid |             order.oid = oid | ||||||
|         order.symbol = symbol.front_fqsn() | 
 | ||||||
|  |         order.symbol = order.symbol.front_fqsn() | ||||||
| 
 | 
 | ||||||
|         line = self.line_from_order( |         line = self.line_from_order( | ||||||
|             order, |             order, | ||||||
|             symbol, |  | ||||||
| 
 | 
 | ||||||
|             show_markers=True, |             show_markers=True, | ||||||
|             only_show_markers_on_hover=True, |             only_show_markers_on_hover=True, | ||||||
|  | @ -298,17 +302,17 @@ class OrderMode: | ||||||
|         # color once the submission ack arrives. |         # color once the submission ack arrives. | ||||||
|         self.lines.submit_line( |         self.lines.submit_line( | ||||||
|             line=line, |             line=line, | ||||||
|             uuid=oid, |             uuid=order.oid, | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|         dialog = OrderDialog( |         dialog = Dialog( | ||||||
|             uuid=oid, |             uuid=order.oid, | ||||||
|             order=order, |             order=order, | ||||||
|             symbol=symbol, |             symbol=order.symbol, | ||||||
|             line=line, |             line=line, | ||||||
|             last_status_close=self.multistatus.open_status( |             last_status_close=self.multistatus.open_status( | ||||||
|                 f'submitting {self._trigger_type}-{order.action}', |                 f'submitting {order.exec_mode}-{order.action}', | ||||||
|                 final_msg=f'submitted {self._trigger_type}-{order.action}', |                 final_msg=f'submitted {order.exec_mode}-{order.action}', | ||||||
|                 clear_on_next=True, |                 clear_on_next=True, | ||||||
|             ) |             ) | ||||||
|         ) |         ) | ||||||
|  | @ -318,14 +322,21 @@ class OrderMode: | ||||||
| 
 | 
 | ||||||
|         # enter submission which will be popped once a response |         # enter submission which will be popped once a response | ||||||
|         # from the EMS is received to move the order to a different# status |         # from the EMS is received to move the order to a different# status | ||||||
|         self.dialogs[oid] = dialog |         self.dialogs[order.oid] = dialog | ||||||
| 
 | 
 | ||||||
|         # hook up mouse drag handlers |         # hook up mouse drag handlers | ||||||
|         line._on_drag_start = self.order_line_modify_start |         line._on_drag_start = self.order_line_modify_start | ||||||
|         line._on_drag_end = self.order_line_modify_complete |         line._on_drag_end = self.order_line_modify_complete | ||||||
| 
 | 
 | ||||||
|         # send order cmd to ems |         # send order cmd to ems | ||||||
|  |         if send_msg: | ||||||
|             self.book.send(order) |             self.book.send(order) | ||||||
|  |         else: | ||||||
|  |             # just register for control over this order | ||||||
|  |             # TODO: some kind of mini-perms system here based on | ||||||
|  |             # an out-of-band tagging/auth sub-sys for multiplayer | ||||||
|  |             # order control? | ||||||
|  |             self.book._sent_orders[order.oid] = order | ||||||
| 
 | 
 | ||||||
|         return dialog |         return dialog | ||||||
| 
 | 
 | ||||||
|  | @ -363,7 +374,7 @@ class OrderMode: | ||||||
|         self, |         self, | ||||||
|         uuid: str |         uuid: str | ||||||
| 
 | 
 | ||||||
|     ) -> OrderDialog: |     ) -> Dialog: | ||||||
|         ''' |         ''' | ||||||
|         Order submitted status event handler. |         Order submitted status event handler. | ||||||
| 
 | 
 | ||||||
|  | @ -418,7 +429,7 @@ class OrderMode: | ||||||
|         self, |         self, | ||||||
| 
 | 
 | ||||||
|         uuid: str, |         uuid: str, | ||||||
|         msg: Dict[str, Any], |         msg: Status, | ||||||
| 
 | 
 | ||||||
|     ) -> None: |     ) -> None: | ||||||
| 
 | 
 | ||||||
|  | @ -442,7 +453,7 @@ class OrderMode: | ||||||
| 
 | 
 | ||||||
|                 # TODO: add in standard fill/exec info that maybe we |                 # TODO: add in standard fill/exec info that maybe we | ||||||
|                 # pack in a broker independent way? |                 # pack in a broker independent way? | ||||||
|                 f'{msg["resp"]}: {msg["trigger_price"]}', |                 f'{msg.resp}: {msg.req.price}', | ||||||
|             ], |             ], | ||||||
|         ) |         ) | ||||||
|         log.runtime(result) |         log.runtime(result) | ||||||
|  | @ -502,7 +513,7 @@ class OrderMode: | ||||||
|                     oid = dialog.uuid |                     oid = dialog.uuid | ||||||
| 
 | 
 | ||||||
|                     cancel_status_close = self.multistatus.open_status( |                     cancel_status_close = self.multistatus.open_status( | ||||||
|                         f'cancelling order {oid[:6]}', |                         f'cancelling order {oid}', | ||||||
|                         group_key=key, |                         group_key=key, | ||||||
|                     ) |                     ) | ||||||
|                     dialog.last_status_close = cancel_status_close |                     dialog.last_status_close = cancel_status_close | ||||||
|  | @ -512,6 +523,45 @@ class OrderMode: | ||||||
| 
 | 
 | ||||||
|         return ids |         return ids | ||||||
| 
 | 
 | ||||||
|  |     def load_unknown_dialog_from_msg( | ||||||
|  |         self, | ||||||
|  |         msg: Status, | ||||||
|  | 
 | ||||||
|  |     ) -> Dialog: | ||||||
|  | 
 | ||||||
|  |         # NOTE: the `.order` attr **must** be set with the | ||||||
|  |         # equivalent order msg in order to be loaded. | ||||||
|  |         order = Order(**msg.req) | ||||||
|  |         oid = str(msg.oid) | ||||||
|  |         symbol = order.symbol | ||||||
|  | 
 | ||||||
|  |         # TODO: MEGA UGGG ZONEEEE! | ||||||
|  |         src = msg.src | ||||||
|  |         if ( | ||||||
|  |             src | ||||||
|  |             and src != 'dark' | ||||||
|  |             and src not in symbol | ||||||
|  |         ): | ||||||
|  |             fqsn = symbol + '.' + src | ||||||
|  |             brokername = src | ||||||
|  |         else: | ||||||
|  |             fqsn = symbol | ||||||
|  |             *head, brokername = fqsn.rsplit('.') | ||||||
|  | 
 | ||||||
|  |         # fill out complex fields | ||||||
|  |         order.oid = str(order.oid) | ||||||
|  |         order.brokers = [brokername] | ||||||
|  |         order.symbol = Symbol.from_fqsn( | ||||||
|  |             fqsn=fqsn, | ||||||
|  |             info={}, | ||||||
|  |         ) | ||||||
|  |         dialog = self.submit_order( | ||||||
|  |             send_msg=False, | ||||||
|  |             order=order, | ||||||
|  |         ) | ||||||
|  |         assert self.dialogs[oid] == dialog | ||||||
|  |         return dialog | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| @asynccontextmanager | @asynccontextmanager | ||||||
| async def open_order_mode( | async def open_order_mode( | ||||||
|  | @ -549,6 +599,7 @@ async def open_order_mode( | ||||||
|             trades_stream, |             trades_stream, | ||||||
|             position_msgs, |             position_msgs, | ||||||
|             brokerd_accounts, |             brokerd_accounts, | ||||||
|  |             ems_dialog_msgs, | ||||||
|         ), |         ), | ||||||
|         trio.open_nursery() as tn, |         trio.open_nursery() as tn, | ||||||
| 
 | 
 | ||||||
|  | @ -596,10 +647,10 @@ async def open_order_mode( | ||||||
| 
 | 
 | ||||||
|                 sym = msg['symbol'] |                 sym = msg['symbol'] | ||||||
|                 if ( |                 if ( | ||||||
|                     sym == symkey or |                     (sym == symkey) or ( | ||||||
|                     # mega-UGH, i think we need to fix the FQSN stuff sooner |                         # mega-UGH, i think we need to fix the FQSN | ||||||
|                     # then later.. |                         # stuff sooner then later.. | ||||||
|                     sym == symkey.removesuffix(f'.{broker}') |                         sym == symkey.removesuffix(f'.{broker}')) | ||||||
|                 ): |                 ): | ||||||
|                     pps_by_account[acctid] = msg |                     pps_by_account[acctid] = msg | ||||||
| 
 | 
 | ||||||
|  | @ -703,7 +754,7 @@ async def open_order_mode( | ||||||
|         order_pane.order_mode = mode |         order_pane.order_mode = mode | ||||||
| 
 | 
 | ||||||
|         # select a pp to track |         # select a pp to track | ||||||
|         tracker = trackers[pp_account] |         tracker: PositionTracker = trackers[pp_account] | ||||||
|         mode.current_pp = tracker |         mode.current_pp = tracker | ||||||
|         tracker.show() |         tracker.show() | ||||||
|         tracker.hide_info() |         tracker.hide_info() | ||||||
|  | @ -755,38 +806,61 @@ async def open_order_mode( | ||||||
|             # to handle input since the ems connection is ready |             # to handle input since the ems connection is ready | ||||||
|             started.set() |             started.set() | ||||||
| 
 | 
 | ||||||
|  |             for oid, msg in ems_dialog_msgs.items(): | ||||||
|  | 
 | ||||||
|  |                 # HACK ALERT: ensure a resp field is filled out since | ||||||
|  |                 # techincally the call below expects a ``Status``. TODO: | ||||||
|  |                 # parse into proper ``Status`` equivalents ems-side? | ||||||
|  |                 # msg.setdefault('resp', msg['broker_details']['resp']) | ||||||
|  |                 # msg.setdefault('oid', msg['broker_details']['oid']) | ||||||
|  |                 msg['brokerd_msg'] = msg | ||||||
|  | 
 | ||||||
|  |                 await process_trade_msg( | ||||||
|  |                     mode, | ||||||
|  |                     book, | ||||||
|  |                     msg, | ||||||
|  |                 ) | ||||||
|  | 
 | ||||||
|             tn.start_soon( |             tn.start_soon( | ||||||
|                 process_trades_and_update_ui, |                 process_trades_and_update_ui, | ||||||
|                 tn, |  | ||||||
|                 feed, |  | ||||||
|                 mode, |  | ||||||
|                 trades_stream, |                 trades_stream, | ||||||
|  |                 mode, | ||||||
|                 book, |                 book, | ||||||
|             ) |             ) | ||||||
|  | 
 | ||||||
|             yield mode |             yield mode | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| async def process_trades_and_update_ui( | async def process_trades_and_update_ui( | ||||||
| 
 | 
 | ||||||
|     n: trio.Nursery, |  | ||||||
|     feed: Feed, |  | ||||||
|     mode: OrderMode, |  | ||||||
|     trades_stream: tractor.MsgStream, |     trades_stream: tractor.MsgStream, | ||||||
|  |     mode: OrderMode, | ||||||
|     book: OrderBook, |     book: OrderBook, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 | 
 | ||||||
|     get_index = mode.chart.get_index |  | ||||||
|     global _pnl_tasks |  | ||||||
| 
 |  | ||||||
|     # this is where we receive **back** messages |     # this is where we receive **back** messages | ||||||
|     # about executions **from** the EMS actor |     # about executions **from** the EMS actor | ||||||
|     async for msg in trades_stream: |     async for msg in trades_stream: | ||||||
|  |         await process_trade_msg( | ||||||
|  |             mode, | ||||||
|  |             book, | ||||||
|  |             msg, | ||||||
|  |         ) | ||||||
| 
 | 
 | ||||||
|  | 
 | ||||||
|  | async def process_trade_msg( | ||||||
|  |     mode: OrderMode, | ||||||
|  |     book: OrderBook, | ||||||
|  |     msg: dict, | ||||||
|  | 
 | ||||||
|  | ) -> tuple[Dialog, Status]: | ||||||
|  | 
 | ||||||
|  |     get_index = mode.chart.get_index | ||||||
|     fmsg = pformat(msg) |     fmsg = pformat(msg) | ||||||
|         log.info(f'Received order msg:\n{fmsg}') |     log.debug(f'Received order msg:\n{fmsg}') | ||||||
| 
 |  | ||||||
|     name = msg['name'] |     name = msg['name'] | ||||||
|  | 
 | ||||||
|     if name in ( |     if name in ( | ||||||
|         'position', |         'position', | ||||||
|     ): |     ): | ||||||
|  | @ -811,95 +885,102 @@ async def process_trades_and_update_ui( | ||||||
| 
 | 
 | ||||||
|         # short circuit to next msg to avoid |         # short circuit to next msg to avoid | ||||||
|         # unnecessary msg content lookups |         # unnecessary msg content lookups | ||||||
|             continue |         return | ||||||
| 
 | 
 | ||||||
|         resp = msg['resp'] |     msg = Status(**msg) | ||||||
|         oid = msg['oid'] |     resp = msg.resp | ||||||
|  |     oid = msg.oid | ||||||
|  |     dialog: Dialog = mode.dialogs.get(oid) | ||||||
| 
 | 
 | ||||||
|         dialog = mode.dialogs.get(oid) |     match msg: | ||||||
|         if dialog is None: |         case Status(resp='dark_open' | 'open'): | ||||||
|             log.warning(f'received msg for untracked dialog:\n{fmsg}') |  | ||||||
| 
 |  | ||||||
|             # TODO: enable pure tracking / mirroring of dialogs |  | ||||||
|             # is desired. |  | ||||||
|             continue |  | ||||||
| 
 |  | ||||||
|         # record message to dialog tracking |  | ||||||
|         dialog.msgs[oid] = msg |  | ||||||
| 
 |  | ||||||
|         # response to 'action' request (buy/sell) |  | ||||||
|         if resp in ( |  | ||||||
|             'dark_submitted', |  | ||||||
|             'broker_submitted' |  | ||||||
|         ): |  | ||||||
| 
 | 
 | ||||||
|  |             if dialog is not None: | ||||||
|                 # show line label once order is live |                 # show line label once order is live | ||||||
|                 mode.on_submit(oid) |                 mode.on_submit(oid) | ||||||
| 
 | 
 | ||||||
|         # resp to 'cancel' request or error condition |             else: | ||||||
|         # for action request |                 log.warning( | ||||||
|         elif resp in ( |                     f'received msg for untracked dialog:\n{fmsg}' | ||||||
|             'broker_inactive', |                 ) | ||||||
|             'broker_errored', |                 assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}' | ||||||
|  | 
 | ||||||
|  |                 sym = mode.chart.linked.symbol | ||||||
|  |                 fqsn = sym.front_fqsn() | ||||||
|  |                 order = Order(**msg.req) | ||||||
|  |                 if ( | ||||||
|  |                     ((order.symbol + f'.{msg.src}') == fqsn) | ||||||
|  | 
 | ||||||
|  |                     # a existing dark order for the same symbol | ||||||
|  |                     or ( | ||||||
|  |                         order.symbol == fqsn | ||||||
|  |                         and (msg.src == 'dark') or (msg.src in fqsn) | ||||||
|  |                     ) | ||||||
|                 ): |                 ): | ||||||
|  |                     dialog = mode.load_unknown_dialog_from_msg(msg) | ||||||
|  |                     mode.on_submit(oid) | ||||||
|  |                     # return dialog, msg | ||||||
|  | 
 | ||||||
|  |         case Status(resp='error'): | ||||||
|             # delete level line from view |             # delete level line from view | ||||||
|             mode.on_cancel(oid) |             mode.on_cancel(oid) | ||||||
|             broker_msg = msg['brokerd_msg'] |             broker_msg = msg.brokerd_msg | ||||||
|             log.error( |             log.error( | ||||||
|                 f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' |                 f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|         elif resp in ( |         case Status(resp='canceled'): | ||||||
|             'broker_cancelled', |  | ||||||
|             'dark_cancelled' |  | ||||||
|         ): |  | ||||||
|             # delete level line from view |             # delete level line from view | ||||||
|             mode.on_cancel(oid) |             mode.on_cancel(oid) | ||||||
|             broker_msg = msg['brokerd_msg'] |             req = Order(**msg.req) | ||||||
|             log.cancel( |             log.cancel(f'Canceled {req.action}:{oid}') | ||||||
|                 f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' |  | ||||||
|             ) |  | ||||||
| 
 | 
 | ||||||
|         elif resp in ( |         case Status( | ||||||
|             'dark_triggered' |             resp='triggered', | ||||||
|  |             # req=Order(exec_mode='dark')  # TODO: | ||||||
|  |             req={'exec_mode': 'dark'}, | ||||||
|         ): |         ): | ||||||
|  |             # TODO: UX for a "pending" clear/live order | ||||||
|             log.info(f'Dark order triggered for {fmsg}') |             log.info(f'Dark order triggered for {fmsg}') | ||||||
| 
 | 
 | ||||||
|         elif resp in ( |         case Status( | ||||||
|             'alert_triggered' |             resp='triggered', | ||||||
|  |             # req=Order(exec_mode='live', action='alert') as req, # TODO | ||||||
|  |             req={'exec_mode': 'live', 'action': 'alert'} as req, | ||||||
|         ): |         ): | ||||||
|             # should only be one "fill" for an alert |             # should only be one "fill" for an alert | ||||||
|             # add a triangle and remove the level line |             # add a triangle and remove the level line | ||||||
|  |             req = Order(**req) | ||||||
|             mode.on_fill( |             mode.on_fill( | ||||||
|                 oid, |                 oid, | ||||||
|                 price=msg['trigger_price'], |                 price=req.price, | ||||||
|                 arrow_index=get_index(time.time()), |                 arrow_index=get_index(time.time()), | ||||||
|             ) |             ) | ||||||
|             mode.lines.remove_line(uuid=oid) |             mode.lines.remove_line(uuid=oid) | ||||||
|  |             msg.req = req | ||||||
|             await mode.on_exec(oid, msg) |             await mode.on_exec(oid, msg) | ||||||
| 
 | 
 | ||||||
|         # response to completed 'action' request for buy/sell |         # response to completed 'dialog' for order request | ||||||
|         elif resp in ( |         case Status( | ||||||
|             'broker_executed', |             resp='closed', | ||||||
|  |             # req=Order() as req,  # TODO | ||||||
|  |             req=req, | ||||||
|         ): |         ): | ||||||
|             # right now this is just triggering a system alert |             msg.req = Order(**req) | ||||||
|             await mode.on_exec(oid, msg) |             await mode.on_exec(oid, msg) | ||||||
| 
 |  | ||||||
|             if msg['brokerd_msg']['remaining'] == 0: |  | ||||||
|             mode.lines.remove_line(uuid=oid) |             mode.lines.remove_line(uuid=oid) | ||||||
| 
 | 
 | ||||||
|         # each clearing tick is responded individually |         # each clearing tick is responded individually | ||||||
|         elif resp in ( |         case Status(resp='fill'): | ||||||
|             'broker_filled', |  | ||||||
|         ): |  | ||||||
| 
 | 
 | ||||||
|  |             # handle out-of-piker fills reporting? | ||||||
|             known_order = book._sent_orders.get(oid) |             known_order = book._sent_orders.get(oid) | ||||||
|             if not known_order: |             if not known_order: | ||||||
|                 log.warning(f'order {oid} is unknown') |                 log.warning(f'order {oid} is unknown') | ||||||
|                 continue |                 return | ||||||
| 
 | 
 | ||||||
|             action = known_order.action |             action = known_order.action | ||||||
|             details = msg['brokerd_msg'] |             details = msg.brokerd_msg | ||||||
| 
 | 
 | ||||||
|             # TODO: some kinda progress system |             # TODO: some kinda progress system | ||||||
|             mode.on_fill( |             mode.on_fill( | ||||||
|  | @ -914,3 +995,9 @@ async def process_trades_and_update_ui( | ||||||
|             # TODO: how should we look this up? |             # TODO: how should we look this up? | ||||||
|             # tracker = mode.trackers[msg['account']] |             # tracker = mode.trackers[msg['account']] | ||||||
|             # tracker.live_pp.fills.append(msg) |             # tracker.live_pp.fills.append(msg) | ||||||
|  | 
 | ||||||
|  |     # record message to dialog tracking | ||||||
|  |     if dialog: | ||||||
|  |         dialog.msgs[oid] = msg | ||||||
|  | 
 | ||||||
|  |     return dialog, msg | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue