diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b768c9a1..4737d376 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -148,7 +148,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'No account found: `{account}` ?', - ).dict()) + )) continue client = _accounts2clients.get(account) @@ -161,7 +161,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'No api client loaded for account: `{account}` ?', - ).dict()) + )) continue if action in {'buy', 'sell'}: @@ -188,7 +188,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason='Order already active?', - ).dict()) + )) # deliver ack that order has been submitted to broker routing await ems_order_stream.send( @@ -197,9 +197,8 @@ async def handle_order_requests( oid=order.oid, # broker specific request id reqid=reqid, - time_ns=time.time_ns(), account=account, - ).dict() + ) ) elif action == 'cancel': @@ -559,7 +558,7 @@ async def trades_dialogue( cids2pps, validate=True, ) - all_positions.extend(msg.dict() for msg in msgs) + all_positions.extend(msg for msg in msgs) if not all_positions and cids2pps: raise RuntimeError( @@ -665,7 +664,7 @@ async def emit_pp_update( msg = msgs[0] break - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) async def deliver_trade_events( @@ -743,7 +742,7 @@ async def deliver_trade_events( broker_details={'name': 'ib'}, ) - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) case 'fill': @@ -803,7 +802,7 @@ async def deliver_trade_events( broker_time=trade_entry['broker_time'], ) - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) # 2 cases: # - fill comes first or @@ -879,7 +878,7 @@ async def deliver_trade_events( cid, msg = pack_position(item) # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! - # await ems_stream.send(msg.dict()) + # await ems_stream.send(msg) case 'event': @@ -891,7 +890,7 @@ async def deliver_trade_events( # level... # reqid = item.get('reqid', 0) # if getattr(msg, 'reqid', 0) < -1: - # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + # log.info(f"TWS triggered trade\n{pformat(msg)}") # msg.reqid = 'tws-' + str(-1 * reqid) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index bc89af84..588a0924 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -46,7 +46,7 @@ from piker.clearing._messages import ( BrokerdPosition, BrokerdStatus, ) -from pikerd.data.types import Struct +from piker.data.types import Struct from . import log from .api import ( Client, @@ -110,7 +110,7 @@ async def handle_order_requests( 'https://github.com/pikers/piker/issues/299' ), - ).dict()) + )) continue # validate @@ -136,7 +136,7 @@ async def handle_order_requests( symbol=order.symbol, reason="Failed order submission", broker_details=resp - ).dict() + ) ) else: # TODO: handle multiple orders (cancels?) @@ -161,7 +161,7 @@ async def handle_order_requests( # account the made the order account=order.account - ).dict() + ) ) elif action == 'cancel': @@ -189,7 +189,7 @@ async def handle_order_requests( symbol=msg.symbol, reason="Failed order cancel", broker_details=resp - ).dict() + ) ) if not error: @@ -217,7 +217,7 @@ async def handle_order_requests( # cancels will eventually get cancelled reason="Order cancel is still pending?", broker_details=resp - ).dict() + ) ) else: # order cancel success case. @@ -230,7 +230,7 @@ async def handle_order_requests( status='cancelled', reason='Order cancelled', broker_details={'name': 'kraken'} - ).dict() + ) ) else: log.error(f'Unknown order command: {request_msg}') @@ -330,7 +330,7 @@ async def trades_dialogue( avg_price=p.be_price, currency='', ) - position_msgs.append(msg.dict()) + position_msgs.append(msg) await ctx.started( (position_msgs, [acc_name]) @@ -408,7 +408,7 @@ async def trades_dialogue( broker_details={'name': 'kraken'}, broker_time=broker_time ) - await ems_stream.send(fill_msg.dict()) + await ems_stream.send(fill_msg) filled_msg = BrokerdStatus( reqid=reqid, @@ -432,7 +432,7 @@ async def trades_dialogue( # https://github.com/pikers/piker/issues/296 remaining=0, ) - await ems_stream.send(filled_msg.dict()) + await ems_stream.send(filled_msg) # update ledger and position tracking trans = await update_ledger(acctid, trades) @@ -469,7 +469,7 @@ async def trades_dialogue( # TODO # currency='' ) - await ems_stream.send(pp_msg.dict()) + await ems_stream.send(pp_msg) case [ trades_msgs, diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 837c28bc..91cb94fa 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -58,11 +58,11 @@ class OrderBook: def send( self, - msg: Order, + msg: Order | dict, ) -> dict: self._sent_orders[msg.oid] = msg - self._to_ems.send_nowait(msg.dict()) + self._to_ems.send_nowait(msg) return msg def update( @@ -73,9 +73,8 @@ class OrderBook: ) -> dict: cmd = self._sent_orders[uuid] - msg = cmd.dict() - msg.update(data) - self._sent_orders[uuid] = Order(**msg) + msg = cmd.copy(update=data) + self._sent_orders[uuid] = msg self._to_ems.send_nowait(msg) return cmd @@ -88,7 +87,7 @@ class OrderBook: oid=uuid, symbol=cmd.symbol, ) - self._to_ems.send_nowait(msg.dict()) + self._to_ems.send_nowait(msg) _orders: OrderBook = None @@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code( book = get_orders() async with book._from_order_book.subscribe() as orders_stream: async for cmd in orders_stream: - if cmd['symbol'] == symbol_key: + if cmd.symbol == symbol_key: log.info(f'Send order cmd:\n{pformat(cmd)}') # send msg over IPC / wire await to_ems_stream.send(cmd) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 87b8d348..2b9f50cd 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -231,7 +231,7 @@ async def clear_dark_triggers( price=submit_price, size=cmd['size'], ) - await brokerd_orders_stream.send(msg.dict()) + await brokerd_orders_stream.send(msg) # mark this entry as having sent an order # request. the entry will be replaced once the @@ -247,14 +247,11 @@ async def clear_dark_triggers( msg = Status( oid=oid, # ems order id - resp=resp, time_ns=time.time_ns(), - symbol=fqsn, + resp=resp, trigger_price=price, - broker_details={'name': broker}, - cmd=cmd, # original request message - - ).dict() + brokerd_msg=cmd, + ) # remove exec-condition from set log.info(f'removing pred for {oid}') @@ -577,11 +574,11 @@ async def translate_and_relay_brokerd_events( if name == 'position': - pos_msg = BrokerdPosition(**brokerd_msg).dict() + pos_msg = BrokerdPosition(**brokerd_msg) # XXX: this will be useful for automatic strats yah? # keep pps per account up to date locally in ``emsd`` mem - sym, broker = pos_msg['symbol'], pos_msg['broker'] + sym, broker = pos_msg.symbol, pos_msg.broker relay.positions.setdefault( # NOTE: translate to a FQSN! @@ -672,7 +669,7 @@ async def translate_and_relay_brokerd_events( entry.reqid = reqid # tell broker to cancel immediately - await brokerd_trades_stream.send(entry.dict()) + await brokerd_trades_stream.send(entry) # - the order is now active and will be mirrored in # our book -> registered as live flow @@ -712,7 +709,7 @@ async def translate_and_relay_brokerd_events( # if 10147 in message: cancel resp = 'broker_errored' - broker_details = msg.dict() + broker_details = msg # don't relay message to order requester client # continue @@ -747,7 +744,7 @@ async def translate_and_relay_brokerd_events( resp = 'broker_' + msg.status # pass the BrokerdStatus msg inside the broker details field - broker_details = msg.dict() + broker_details = msg elif name in ( 'fill', @@ -756,7 +753,7 @@ async def translate_and_relay_brokerd_events( # proxy through the "fill" result(s) resp = 'broker_filled' - broker_details = msg.dict() + broker_details = msg log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') @@ -774,7 +771,7 @@ async def translate_and_relay_brokerd_events( time_ns=time.time_ns(), broker_reqid=reqid, brokerd_msg=broker_details, - ).dict() + ) ) except KeyError: log.error( @@ -846,7 +843,7 @@ async def process_client_order_cmds( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(msg.dict()) + await brokerd_order_stream.send(msg) else: # this might be a cancel for an order that hasn't been @@ -868,7 +865,7 @@ async def process_client_order_cmds( resp='dark_cancelled', oid=oid, time_ns=time.time_ns(), - ).dict() + ) ) # de-register this client dialogue router.dialogues.pop(oid) @@ -923,7 +920,7 @@ async def process_client_order_cmds( # handle relaying the ems side responses back to # the client/cmd sender from this request log.info(f'Sending live order to {broker}:\n{pformat(msg)}') - await brokerd_order_stream.send(msg.dict()) + await brokerd_order_stream.send(msg) # an immediate response should be ``BrokerdOrderAck`` # with ems order id from the ``trades_dialogue()`` @@ -1003,7 +1000,7 @@ async def process_client_order_cmds( resp=resp, oid=oid, time_ns=time.time_ns(), - ).dict() + ) ) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index cf580876..802dcf46 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -117,7 +117,7 @@ class PaperBoi: reason='paper_trigger', remaining=size, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) # if we're already a clearing price simulate an immediate fill if ( @@ -173,7 +173,7 @@ class PaperBoi: broker=self.broker, time_ns=time.time_ns(), ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) async def fake_fill( self, @@ -216,7 +216,7 @@ class PaperBoi: 'name': self.broker + '_paper', }, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) if order_complete: @@ -240,7 +240,7 @@ class PaperBoi: 'name': self.broker, }, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) # lookup any existing position token = f'{symbol}.{self.broker}' @@ -268,7 +268,7 @@ class PaperBoi: ) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) - await self.ems_trades_stream.send(pp_msg.dict()) + await self.ems_trades_stream.send(pp_msg) async def simulate_fills( @@ -384,7 +384,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'Paper only. No account found: `{account}` ?', - ).dict()) + )) continue # validate @@ -416,7 +416,7 @@ async def handle_order_requests( # broker specific request id reqid=reqid, - ).dict() + ) ) elif action == 'cancel': diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 89e7b2d8..83a0ed48 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -264,7 +264,8 @@ class OrderMode: self, ) -> OrderDialog: - '''Send execution order to EMS return a level line to + ''' + Send execution order to EMS return a level line to represent the order on a chart. ''' @@ -273,13 +274,9 @@ class OrderMode: oid = str(uuid.uuid4()) # format order data for ems - fqsn = symbol.front_fqsn() - order = staged.copy( - update={ - 'symbol': fqsn, - 'oid': oid, - } - ) + order = staged.copy() + order.oid = oid + order.symbol = symbol.front_fqsn() line = self.line_from_order( order,