Compare commits
	
		
			No commits in common. "2f6e3ad03f64f5f62e0096eb7421ef3d84dd2fcf" and "b52c4092f30bd19d255c2ee153cf3393ba601268" have entirely different histories. 
		
	
	
		
			2f6e3ad03f
			...
			b52c4092f3
		
	
		|  | @ -36,6 +36,8 @@ from trio_typing import TaskStatus | |||
| import tractor | ||||
| from ib_insync.contract import ( | ||||
|     Contract, | ||||
|     # Option, | ||||
|     # Forex, | ||||
| ) | ||||
| from ib_insync.order import ( | ||||
|     Trade, | ||||
|  | @ -362,24 +364,11 @@ async def update_and_audit_msgs( | |||
|                 # presume we're at least not more in the shit then we | ||||
|                 # thought. | ||||
|                 if diff: | ||||
|                     reverse_split_ratio = pikersize / ibsize | ||||
|                     split_ratio = 1/reverse_split_ratio | ||||
| 
 | ||||
|                     if split_ratio >= reverse_split_ratio: | ||||
|                         entry = f'split_ratio = {int(split_ratio)}' | ||||
|                     else: | ||||
|                         entry = f'split_ratio = 1/{int(reverse_split_ratio)}' | ||||
| 
 | ||||
|                     raise ValueError( | ||||
|                         f'POSITION MISMATCH ib <-> piker ledger:\n' | ||||
|                         f'ib: {ibppmsg}\n' | ||||
|                         f'piker: {msg}\n' | ||||
|                         f'reverse_split_ratio: {reverse_split_ratio}\n' | ||||
|                         f'split_ratio: {split_ratio}\n\n' | ||||
|                         'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n' | ||||
|                         'If you are expecting a (reverse) split in this ' | ||||
|                         'instrument you should probably put the following ' | ||||
|                         f'in the `pps.toml` section:\n{entry}' | ||||
|                         'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?' | ||||
|                     ) | ||||
|                     msg.size = ibsize | ||||
| 
 | ||||
|  | @ -543,7 +532,6 @@ async def trades_dialogue( | |||
|                     # sure know which positions to update from the ledger if | ||||
|                     # any are missing from the ``pps.toml`` | ||||
|                     bsuid, msg = pack_position(pos) | ||||
| 
 | ||||
|                     acctid = msg.account = accounts_def.inverse[msg.account] | ||||
|                     acctid = acctid.strip('ib.') | ||||
|                     cids2pps[(acctid, bsuid)] = msg | ||||
|  | @ -583,22 +571,14 @@ async def trades_dialogue( | |||
|                             trans = trans_by_acct.get(acctid) | ||||
|                             if trans: | ||||
|                                 table.update_from_trans(trans) | ||||
|                                 table.update_from_trans(trans) | ||||
| 
 | ||||
|                         # XXX: not sure exactly why it wouldn't be in | ||||
|                         # the updated output (maybe this is a bug?) but | ||||
|                         # if you create a pos from TWS and then load it | ||||
|                         # from the api trades it seems we get a key | ||||
|                         # error from ``update[bsuid]`` ? | ||||
|                         pp = table.pps.get(bsuid) | ||||
|                         if not pp: | ||||
|                             log.error( | ||||
|                                 f'The contract id for {msg} may have ' | ||||
|                                 f'changed to {bsuid}\nYou may need to ' | ||||
|                                 'adjust your ledger for this, skipping ' | ||||
|                                 'for now.' | ||||
|                             ) | ||||
|                             continue | ||||
| 
 | ||||
|                         pp = table.pps[bsuid] | ||||
|                         if msg.size != pp.size: | ||||
|                             log.error( | ||||
|                                 'Position mismatch {pp.symbol.front_fqsn()}:\n' | ||||
|  | @ -750,11 +730,8 @@ async def emit_pp_update( | |||
| _statuses: dict[str, str] = { | ||||
|     'cancelled': 'canceled', | ||||
|     'submitted': 'open', | ||||
|     # XXX: just pass these through? it duplicates actual fill events other | ||||
|     # then the case where you the `.remaining == 0` case which is our | ||||
|     # 'closed'` case. | ||||
|     # 'filled': 'pending', | ||||
|     # 'pendingsubmit': 'pending', | ||||
|     'pendingsubmit': 'pending', | ||||
|     'filled': 'fill', | ||||
| 
 | ||||
|     # TODO: see a current ``ib_insync`` issue around this: | ||||
|     # https://github.com/erdewit/ib_insync/issues/363 | ||||
|  | @ -828,12 +805,8 @@ async def deliver_trade_events( | |||
|                 # cancelling.. gawwwd | ||||
|                 if ib_status_key == 'cancelled': | ||||
|                     last_log = trade.log[-1] | ||||
|                     if ( | ||||
|                         last_log.message | ||||
|                         and 'Error' not in last_log.message | ||||
|                     ): | ||||
|                     if last_log.message: | ||||
|                         ib_status_key = trade.log[-2].status | ||||
|                         print(ib_status_key) | ||||
| 
 | ||||
|                 elif ib_status_key == 'inactive': | ||||
|                     async def sched_cancel(): | ||||
|  | @ -848,16 +821,10 @@ async def deliver_trade_events( | |||
| 
 | ||||
|                     nurse.start_soon(sched_cancel) | ||||
| 
 | ||||
|                 status_key = ( | ||||
|                     _statuses.get(ib_status_key) | ||||
|                     or ib_status_key.lower() | ||||
|                 ) | ||||
|                 status_key = _statuses.get(ib_status_key) or ib_status_key | ||||
| 
 | ||||
|                 remaining = status.remaining | ||||
|                 if ( | ||||
|                     status_key == 'filled' | ||||
|                     and remaining == 0 | ||||
|                 ): | ||||
|                 if remaining == 0: | ||||
|                     status_key = 'closed' | ||||
| 
 | ||||
|                 # skip duplicate filled updates - we get the deats | ||||
|  | @ -1011,18 +978,9 @@ async def deliver_trade_events( | |||
|                 if err['reqid'] == -1: | ||||
|                     log.error(f'TWS external order error:\n{pformat(err)}') | ||||
| 
 | ||||
|                 # TODO: we don't want to relay data feed / lookup errors | ||||
|                 # so we need some further filtering logic here.. | ||||
|                 # for most cases the 'status' block above should take | ||||
|                 # care of this. | ||||
|                 # await ems_stream.send(BrokerdStatus( | ||||
|                 #     status='error', | ||||
|                 #     reqid=err['reqid'], | ||||
|                 #     reason=err['reason'], | ||||
|                 #     time_ns=time.time_ns(), | ||||
|                 #     account=accounts_def.inverse[trade.order.account], | ||||
|                 #     broker_details={'name': 'ib'}, | ||||
|                 # )) | ||||
|                 # TODO: what schema for this msg if we're going to make it | ||||
|                 # portable across all backends? | ||||
|                 # msg = BrokerdError(**err) | ||||
| 
 | ||||
|             case 'position': | ||||
| 
 | ||||
|  |  | |||
|  | @ -101,30 +101,3 @@ def percent_change( | |||
|     new: float, | ||||
| ) -> float: | ||||
|     return pnl(init, new) * 100. | ||||
| 
 | ||||
| 
 | ||||
| def diff_dict( | ||||
|     d1: dict, | ||||
|     d2: dict, | ||||
| 
 | ||||
| ) -> dict: | ||||
|     d1_keys = set(d1.keys()) | ||||
|     d2_keys = set(d2.keys()) | ||||
|     shared_keys = d1_keys.intersection(d2_keys) | ||||
|     shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]} | ||||
|     added_keys = d2_keys - d1_keys | ||||
|     added_deltas = {o: (None, d2[o]) for o in added_keys} | ||||
|     deltas = {**shared_deltas, **added_deltas} | ||||
|     return parse_deltas(deltas) | ||||
| 
 | ||||
| 
 | ||||
| def parse_deltas(deltas: dict) -> dict: | ||||
|     res = {} | ||||
|     for k, v in deltas.items(): | ||||
|         if isinstance(v[0], dict): | ||||
|             tmp = diff_dict(v[0], v[1]) | ||||
|             if tmp: | ||||
|                 res[k] = tmp | ||||
|         else: | ||||
|             res[k] = v[1] | ||||
|     return res | ||||
|  |  | |||
|  | @ -83,13 +83,7 @@ class OrderBook: | |||
|         """Cancel an order (or alert) in the EMS. | ||||
| 
 | ||||
|         """ | ||||
|         cmd = self._sent_orders.get(uuid) | ||||
|         if not cmd: | ||||
|             log.error( | ||||
|                 f'Unknown order {uuid}!?\n' | ||||
|                 f'Maybe there is a stale entry or line?\n' | ||||
|                 f'You should report this as a bug!' | ||||
|             ) | ||||
|         cmd = self._sent_orders[uuid] | ||||
|         msg = Cancel( | ||||
|             oid=uuid, | ||||
|             symbol=cmd.symbol, | ||||
|  | @ -162,10 +156,7 @@ async def relay_order_cmds_from_sync_code( | |||
|                 # send msg over IPC / wire | ||||
|                 await to_ems_stream.send(cmd) | ||||
|             else: | ||||
|                 log.warning( | ||||
|                     f'Ignoring unmatched order cmd for {sym} != {symbol_key}:' | ||||
|                     f'\n{msg}' | ||||
|                 ) | ||||
|                 log.warning(f'Ignoring unmatched order cmd for {sym}: {msg}') | ||||
| 
 | ||||
| 
 | ||||
| @acm | ||||
|  |  | |||
|  | @ -667,19 +667,19 @@ async def translate_and_relay_brokerd_events( | |||
|                 # received this ack, in which case we relay that cancel | ||||
|                 # signal **asap** to the backend broker | ||||
|                 # status = book._active.get(oid) | ||||
|                 status_msg = book._active[oid] | ||||
|                 req = status_msg.req | ||||
|                 status = book._active[oid] | ||||
|                 req = status.req | ||||
|                 if req and req.action == 'cancel': | ||||
|                     # assign newly providerd broker backend request id | ||||
|                     # and tell broker to cancel immediately | ||||
|                     status_msg.reqid = reqid | ||||
|                     status.reqid = reqid | ||||
|                     await brokerd_trades_stream.send(req) | ||||
| 
 | ||||
|                 # 2. the order is now active and will be mirrored in | ||||
|                 # our book -> registered as live flow | ||||
|                 else: | ||||
|                     # TODO: should we relay this ack state? | ||||
|                     status_msg.resp = 'pending' | ||||
|                     status.resp = 'pending' | ||||
| 
 | ||||
|                 # no msg to client necessary | ||||
|                 continue | ||||
|  | @ -729,7 +729,6 @@ async def translate_and_relay_brokerd_events( | |||
|                 # msg-chain/dialog. | ||||
|                 ems_client_order_stream = router.dialogues[oid] | ||||
|                 status_msg = book._active[oid] | ||||
|                 old_resp = status_msg.resp | ||||
|                 status_msg.resp = status | ||||
| 
 | ||||
|                 # retrieve existing live flow | ||||
|  | @ -747,47 +746,16 @@ async def translate_and_relay_brokerd_events( | |||
| 
 | ||||
|                 if status == 'closed': | ||||
|                     log.info(f'Execution for {oid} is complete!') | ||||
| 
 | ||||
|                     # only if we already rxed a fill then probably | ||||
|                     # this clear is fully complete? (frickin ib..) | ||||
|                     if old_resp == 'fill': | ||||
|                     status_msg = book._active.pop(oid) | ||||
| 
 | ||||
|                 elif status == 'canceled': | ||||
|                     log.cancel(f'Cancellation for {oid} is complete!') | ||||
|                     log.info(f'Cancellation for {oid} is complete!') | ||||
| 
 | ||||
|                 else:  # open | ||||
|                     # relayed from backend but probably not handled so | ||||
|                     # just log it | ||||
|                     log.info(f'{broker} opened order {msg}') | ||||
| 
 | ||||
|             # BrokerdFill | ||||
|             case { | ||||
|                 'name': 'fill', | ||||
|                 'reqid': reqid,  # brokerd generated order-request id | ||||
|                 # 'symbol': sym,  # paper engine doesn't have this, nbd? | ||||
|             } if ( | ||||
|                 oid := book._ems2brokerd_ids.inverse.get(reqid) | ||||
|             ): | ||||
|                 # proxy through the "fill" result(s) | ||||
|                 msg = BrokerdFill(**brokerd_msg) | ||||
|                 log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') | ||||
| 
 | ||||
|                 ems_client_order_stream = router.dialogues[oid] | ||||
| 
 | ||||
|                 # wtf a fill can come after 'closed' from ib? | ||||
|                 status_msg = book._active[oid] | ||||
| 
 | ||||
|                 # only if we already rxed a 'closed' | ||||
|                 # this clear is fully complete? (frickin ib..) | ||||
|                 # if status_msg.resp == 'closed': | ||||
|                 #     status_msg = book._active.pop(oid) | ||||
| 
 | ||||
|                 status_msg.resp = 'fill' | ||||
|                 status_msg.reqid = reqid | ||||
|                 status_msg.brokerd_msg = msg | ||||
|                 await ems_client_order_stream.send(status_msg) | ||||
| 
 | ||||
|             # ``Status`` containing an embedded order msg which | ||||
|             # should be loaded as a "pre-existing open order" from the | ||||
|             # brokerd backend. | ||||
|  | @ -843,14 +811,6 @@ async def translate_and_relay_brokerd_events( | |||
|                 # don't fall through | ||||
|                 continue | ||||
| 
 | ||||
|             # brokerd error | ||||
|             case { | ||||
|                 'name': 'status', | ||||
|                 'status': 'error', | ||||
|             }: | ||||
|                 log.error(f'Broker error:\n{pformat(brokerd_msg)}') | ||||
|                 # XXX: we presume the brokerd cancels its own order | ||||
| 
 | ||||
|             # TOO FAST ``BrokerdStatus`` that arrives | ||||
|             # before the ``BrokerdAck``. | ||||
|             case { | ||||
|  | @ -863,20 +823,32 @@ async def translate_and_relay_brokerd_events( | |||
|                 'status': status, | ||||
|                 'reqid': reqid, | ||||
|             }: | ||||
|                 status_msg = book._active[oid] | ||||
|                 log.warning( | ||||
|                     'Unhandled broker status for dialog:\n' | ||||
|                     f'{pformat(status_msg)}\n' | ||||
|                     'Unhandled broker status:\n' | ||||
|                     f'{pformat(brokerd_msg)}\n' | ||||
|                 ) | ||||
| 
 | ||||
|             # BrokerdFill | ||||
|             case { | ||||
|                 'name': 'fill', | ||||
|                 'reqid': reqid,  # brokerd generated order-request id | ||||
|                 # 'symbol': sym,  # paper engine doesn't have this, nbd? | ||||
|             } if ( | ||||
|                 oid := book._ems2brokerd_ids.inverse.get(reqid) | ||||
|             ): | ||||
|                 # proxy through the "fill" result(s) | ||||
|                 log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}') | ||||
|                 msg = BrokerdFill(**brokerd_msg) | ||||
|                 ems_client_order_stream = router.dialogues[oid] | ||||
|                 status_msg = book._active[oid] | ||||
|                 status_msg.resp = 'fill' | ||||
|                 status_msg.reqid = reqid | ||||
|                 status_msg.brokerd_msg = msg | ||||
|                 await ems_client_order_stream.send(status_msg) | ||||
| 
 | ||||
|             case _: | ||||
|                 raise ValueError(f'Brokerd message {brokerd_msg} is invalid') | ||||
| 
 | ||||
|         # XXX: ugh sometimes we don't access it? | ||||
|         if status_msg: | ||||
|             del status_msg | ||||
| 
 | ||||
|     # TODO: do we want this to keep things cleaned up? | ||||
|     # it might require a special status from brokerd to affirm the | ||||
|     # flow is complete? | ||||
|  |  | |||
|  | @ -234,7 +234,6 @@ class BrokerdStatus(Struct): | |||
|         'canceled', | ||||
|         'fill', | ||||
|         'pending', | ||||
|         'error', | ||||
|     ] | ||||
| 
 | ||||
|     account: str | ||||
|  |  | |||
|  | @ -37,7 +37,7 @@ import time | |||
| from math import isnan | ||||
| 
 | ||||
| from bidict import bidict | ||||
| from msgspec.msgpack import encode, decode | ||||
| import msgpack | ||||
| import pyqtgraph as pg | ||||
| import numpy as np | ||||
| import tractor | ||||
|  | @ -774,13 +774,12 @@ async def stream_quotes( | |||
|     async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: | ||||
|         # send subs topics to server | ||||
|         resp = await ws.send_message( | ||||
| 
 | ||||
|             encode({'streams': list(tbks.values())}) | ||||
|             msgpack.dumps({'streams': list(tbks.values())}) | ||||
|         ) | ||||
|         log.info(resp) | ||||
| 
 | ||||
|         async def recv() -> dict[str, Any]: | ||||
|             return decode((await ws.get_message()), encoding='utf-8') | ||||
|             return msgpack.loads((await ws.get_message()), encoding='utf-8') | ||||
| 
 | ||||
|         streams = (await recv())['streams'] | ||||
|         log.info(f"Subscribed to {streams}") | ||||
|  |  | |||
|  | @ -18,7 +18,6 @@ | |||
| Built-in (extension) types. | ||||
| 
 | ||||
| """ | ||||
| import sys | ||||
| from typing import Optional | ||||
| from pprint import pformat | ||||
| 
 | ||||
|  | @ -43,13 +42,8 @@ class Struct( | |||
|         } | ||||
| 
 | ||||
|     def __repr__(self): | ||||
|         # only turn on pprint when we detect a python REPL | ||||
|         # at runtime B) | ||||
|         if hasattr(sys, 'ps1'): | ||||
|         return f'Struct({pformat(self.to_dict())})' | ||||
| 
 | ||||
|         return super().__repr__() | ||||
| 
 | ||||
|     def copy( | ||||
|         self, | ||||
|         update: Optional[dict] = None, | ||||
|  |  | |||
							
								
								
									
										19
									
								
								piker/pp.py
								
								
								
								
							
							
						
						
									
										19
									
								
								piker/pp.py
								
								
								
								
							|  | @ -134,8 +134,6 @@ class Position(Struct): | |||
|     # unique backend symbol id | ||||
|     bsuid: str | ||||
| 
 | ||||
|     split_ratio: Optional[int] = None | ||||
| 
 | ||||
|     # ordered record of known constituent trade messages | ||||
|     clears: dict[ | ||||
|         Union[str, int, Status],  # trade id | ||||
|  | @ -161,9 +159,6 @@ class Position(Struct): | |||
|         clears = d.pop('clears') | ||||
|         expiry = d.pop('expiry') | ||||
| 
 | ||||
|         if self.split_ratio is None: | ||||
|             d.pop('split_ratio') | ||||
| 
 | ||||
|         # TODO: we need to figure out how to have one top level | ||||
|         # listing venue here even when the backend isn't providing | ||||
|         # it via the trades ledger.. | ||||
|  | @ -389,22 +384,12 @@ class Position(Struct): | |||
|                 asize_h.append(accum_size) | ||||
|                 ppu_h.append(ppu_h[-1]) | ||||
| 
 | ||||
|         final_ppu = ppu_h[-1] if ppu_h else 0 | ||||
| 
 | ||||
|         # handle any split info entered (for now) manually by user | ||||
|         if self.split_ratio is not None: | ||||
|             final_ppu /= self.split_ratio | ||||
| 
 | ||||
|         return final_ppu | ||||
|         return ppu_h[-1] if ppu_h else 0 | ||||
| 
 | ||||
|     def calc_size(self) -> float: | ||||
|         size: float = 0 | ||||
|         for tid, entry in self.clears.items(): | ||||
|             size += entry['size'] | ||||
| 
 | ||||
|         if self.split_ratio is not None: | ||||
|             size = round(size * self.split_ratio) | ||||
| 
 | ||||
|         return size | ||||
| 
 | ||||
|     def minimize_clears( | ||||
|  | @ -863,7 +848,6 @@ def open_pps( | |||
|         size = entry['size'] | ||||
|         # TODO: remove but, handle old field name for now | ||||
|         ppu = entry.get('ppu', entry.get('be_price', 0)) | ||||
|         split_ratio = entry.get('split_ratio') | ||||
| 
 | ||||
|         expiry = entry.get('expiry') | ||||
|         if expiry: | ||||
|  | @ -873,7 +857,6 @@ def open_pps( | |||
|             Symbol.from_fqsn(fqsn, info={}), | ||||
|             size=size, | ||||
|             ppu=ppu, | ||||
|             split_ratio=split_ratio, | ||||
|             expiry=expiry, | ||||
|             bsuid=entry['bsuid'], | ||||
| 
 | ||||
|  |  | |||
|  | @ -163,6 +163,7 @@ class OrderMode: | |||
|     ) -> LevelLine: | ||||
| 
 | ||||
|         level = order.price | ||||
|         print(f'SIZE: {order.size}') | ||||
|         line = order_line( | ||||
| 
 | ||||
|             self.chart, | ||||
|  | @ -858,7 +859,7 @@ async def process_trade_msg( | |||
| 
 | ||||
|     get_index = mode.chart.get_index | ||||
|     fmsg = pformat(msg) | ||||
|     log.debug(f'Received order msg:\n{fmsg}') | ||||
|     log.info(f'Received order msg:\n{fmsg}') | ||||
|     name = msg['name'] | ||||
| 
 | ||||
|     if name in ( | ||||
|  | @ -919,7 +920,6 @@ async def process_trade_msg( | |||
|                 ): | ||||
|                     dialog = mode.load_unknown_dialog_from_msg(msg) | ||||
|                     mode.on_submit(oid) | ||||
|                     # return dialog, msg | ||||
| 
 | ||||
|         case Status(resp='error'): | ||||
|             # delete level line from view | ||||
|  | @ -932,15 +932,16 @@ async def process_trade_msg( | |||
|         case Status(resp='canceled'): | ||||
|             # delete level line from view | ||||
|             mode.on_cancel(oid) | ||||
|             req = Order(**msg.req) | ||||
|             log.cancel(f'Canceled {req.action}:{oid}') | ||||
|             req = msg.req | ||||
|             log.cancel( | ||||
|                 f'Canceled order {oid}:\n{pformat(req)}' | ||||
|             ) | ||||
| 
 | ||||
|         case Status( | ||||
|             resp='triggered', | ||||
|             # req=Order(exec_mode='dark')  # TODO: | ||||
|             req={'exec_mode': 'dark'}, | ||||
|         ): | ||||
|             # TODO: UX for a "pending" clear/live order | ||||
|             log.info(f'Dark order triggered for {fmsg}') | ||||
| 
 | ||||
|         case Status( | ||||
|  | @ -950,14 +951,13 @@ async def process_trade_msg( | |||
|         ): | ||||
|             # should only be one "fill" for an alert | ||||
|             # add a triangle and remove the level line | ||||
|             req = Order(**req) | ||||
|             mode.on_fill( | ||||
|                 oid, | ||||
|                 price=req.price, | ||||
|                 arrow_index=get_index(time.time()), | ||||
|             ) | ||||
|             mode.lines.remove_line(uuid=oid) | ||||
|             msg.req = req | ||||
|             msg.req = Order(**req) | ||||
|             await mode.on_exec(oid, msg) | ||||
| 
 | ||||
|         # response to completed 'dialog' for order request | ||||
|  | @ -966,18 +966,18 @@ async def process_trade_msg( | |||
|             # req=Order() as req,  # TODO | ||||
|             req=req, | ||||
|         ): | ||||
|             # right now this is just triggering a system alert | ||||
|             msg.req = Order(**req) | ||||
|             await mode.on_exec(oid, msg) | ||||
|             mode.lines.remove_line(uuid=oid) | ||||
| 
 | ||||
|         # each clearing tick is responded individually | ||||
|         case Status(resp='fill'): | ||||
| 
 | ||||
|             # handle out-of-piker fills reporting? | ||||
|             known_order = book._sent_orders.get(oid) | ||||
|             if not known_order: | ||||
|                 log.warning(f'order {oid} is unknown') | ||||
|                 return | ||||
|                 # continue | ||||
| 
 | ||||
|             action = known_order.action | ||||
|             details = msg.brokerd_msg | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue