From de91c2196df2a4f483447a419d09256dbe8d9cae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Jul 2022 10:55:02 -0400 Subject: [PATCH] Drop remaining `BaseModel` api usage from rest of codebase --- piker/brokers/ib/broker.py | 21 ++++++++++----------- piker/brokers/kraken/broker.py | 14 +++++++------- piker/clearing/_client.py | 13 ++++++------- piker/clearing/_ems.py | 33 +++++++++++++++------------------ piker/clearing/_paper_engine.py | 14 +++++++------- piker/ui/order_mode.py | 13 +++++-------- 6 files changed, 50 insertions(+), 58 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b768c9a1..4737d376 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -148,7 +148,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'No account found: `{account}` ?', - ).dict()) + )) continue client = _accounts2clients.get(account) @@ -161,7 +161,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'No api client loaded for account: `{account}` ?', - ).dict()) + )) continue if action in {'buy', 'sell'}: @@ -188,7 +188,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason='Order already active?', - ).dict()) + )) # deliver ack that order has been submitted to broker routing await ems_order_stream.send( @@ -197,9 +197,8 @@ async def handle_order_requests( oid=order.oid, # broker specific request id reqid=reqid, - time_ns=time.time_ns(), account=account, - ).dict() + ) ) elif action == 'cancel': @@ -559,7 +558,7 @@ async def trades_dialogue( cids2pps, validate=True, ) - all_positions.extend(msg.dict() for msg in msgs) + all_positions.extend(msg for msg in msgs) if not all_positions and cids2pps: raise RuntimeError( @@ -665,7 +664,7 @@ async def emit_pp_update( msg = msgs[0] break - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) async def deliver_trade_events( @@ -743,7 +742,7 @@ async def deliver_trade_events( broker_details={'name': 'ib'}, ) - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) case 'fill': @@ -803,7 +802,7 @@ async def deliver_trade_events( broker_time=trade_entry['broker_time'], ) - await ems_stream.send(msg.dict()) + await ems_stream.send(msg) # 2 cases: # - fill comes first or @@ -879,7 +878,7 @@ async def deliver_trade_events( cid, msg = pack_position(item) # acctid = msg.account = accounts_def.inverse[msg.account] # cuck ib and it's shitty fifo sys for pps! - # await ems_stream.send(msg.dict()) + # await ems_stream.send(msg) case 'event': @@ -891,7 +890,7 @@ async def deliver_trade_events( # level... # reqid = item.get('reqid', 0) # if getattr(msg, 'reqid', 0) < -1: - # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + # log.info(f"TWS triggered trade\n{pformat(msg)}") # msg.reqid = 'tws-' + str(-1 * reqid) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index a0dc1bd1..c0027792 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -184,7 +184,7 @@ async def handle_order_requests( 'Invalid request msg:\n{msg}' ), - ).dict() + ) ) @@ -282,7 +282,7 @@ async def trades_dialogue( avg_price=p.be_price, currency='', ) - position_msgs.append(msg.dict()) + position_msgs.append(msg) await ctx.started( (position_msgs, [acc_name]) @@ -405,7 +405,7 @@ async def handle_order_updates( broker_details={'name': 'kraken'}, broker_time=broker_time ) - await ems_stream.send(fill_msg.dict()) + await ems_stream.send(fill_msg) filled_msg = BrokerdStatus( reqid=reqid, @@ -429,7 +429,7 @@ async def handle_order_updates( # https://github.com/pikers/piker/issues/296 remaining=0, ) - await ems_stream.send(filled_msg.dict()) + await ems_stream.send(filled_msg) # update ledger and position tracking with open_ledger(acctid, trades) as trans: @@ -466,7 +466,7 @@ async def handle_order_updates( # TODO # currency='' ) - await ems_stream.send(pp_msg.dict()) + await ems_stream.send(pp_msg) # process and relay order state change events # https://docs.kraken.com/websockets/#message-openOrders @@ -563,7 +563,7 @@ async def handle_order_updates( ), ) msgs.append(resp) - await ems_stream.send(resp.dict()) + await ems_stream.send(resp) case _: log.warning( @@ -603,7 +603,7 @@ async def handle_order_updates( msgs.extend(resps) for resp in resps: - await ems_stream.send(resp.dict()) + await ems_stream.send(resp) case _: log.warning(f'Unhandled trades update msg: {msg}') diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 837c28bc..91cb94fa 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -58,11 +58,11 @@ class OrderBook: def send( self, - msg: Order, + msg: Order | dict, ) -> dict: self._sent_orders[msg.oid] = msg - self._to_ems.send_nowait(msg.dict()) + self._to_ems.send_nowait(msg) return msg def update( @@ -73,9 +73,8 @@ class OrderBook: ) -> dict: cmd = self._sent_orders[uuid] - msg = cmd.dict() - msg.update(data) - self._sent_orders[uuid] = Order(**msg) + msg = cmd.copy(update=data) + self._sent_orders[uuid] = msg self._to_ems.send_nowait(msg) return cmd @@ -88,7 +87,7 @@ class OrderBook: oid=uuid, symbol=cmd.symbol, ) - self._to_ems.send_nowait(msg.dict()) + self._to_ems.send_nowait(msg) _orders: OrderBook = None @@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code( book = get_orders() async with book._from_order_book.subscribe() as orders_stream: async for cmd in orders_stream: - if cmd['symbol'] == symbol_key: + if cmd.symbol == symbol_key: log.info(f'Send order cmd:\n{pformat(cmd)}') # send msg over IPC / wire await to_ems_stream.send(cmd) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index b35e6640..9b8e0934 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -232,7 +232,7 @@ async def clear_dark_triggers( price=submit_price, size=cmd['size'], ) - await brokerd_orders_stream.send(msg.dict()) + await brokerd_orders_stream.send(msg) # mark this entry as having sent an order # request. the entry will be replaced once the @@ -248,14 +248,11 @@ async def clear_dark_triggers( msg = Status( oid=oid, # ems order id - resp=resp, time_ns=time.time_ns(), - symbol=fqsn, + resp=resp, trigger_price=price, - broker_details={'name': broker}, - cmd=cmd, # original request message - - ).dict() + brokerd_msg=cmd, + ) # remove exec-condition from set log.info(f'removing pred for {oid}') @@ -578,11 +575,11 @@ async def translate_and_relay_brokerd_events( if name == 'position': - pos_msg = BrokerdPosition(**brokerd_msg).dict() + pos_msg = BrokerdPosition(**brokerd_msg) # XXX: this will be useful for automatic strats yah? # keep pps per account up to date locally in ``emsd`` mem - sym, broker = pos_msg['symbol'], pos_msg['broker'] + sym, broker = pos_msg.symbol, pos_msg.broker relay.positions.setdefault( # NOTE: translate to a FQSN! @@ -684,7 +681,7 @@ async def translate_and_relay_brokerd_events( entry.reqid = reqid # tell broker to cancel immediately - await brokerd_trades_stream.send(entry.dict()) + await brokerd_trades_stream.send(entry) # - the order is now active and will be mirrored in # our book -> registered as live flow @@ -724,7 +721,7 @@ async def translate_and_relay_brokerd_events( # if 10147 in message: cancel resp = 'broker_errored' - broker_details = msg.dict() + broker_details = msg # don't relay message to order requester client # continue @@ -759,7 +756,7 @@ async def translate_and_relay_brokerd_events( resp = 'broker_' + msg.status # pass the BrokerdStatus msg inside the broker details field - broker_details = msg.dict() + broker_details = msg elif name in ( 'fill', @@ -768,7 +765,7 @@ async def translate_and_relay_brokerd_events( # proxy through the "fill" result(s) resp = 'broker_filled' - broker_details = msg.dict() + broker_details = msg log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') @@ -786,7 +783,7 @@ async def translate_and_relay_brokerd_events( time_ns=time.time_ns(), broker_reqid=reqid, brokerd_msg=broker_details, - ).dict() + ) ) except KeyError: log.error( @@ -858,7 +855,7 @@ async def process_client_order_cmds( f'Submitting cancel for live order {reqid}' ) - await brokerd_order_stream.send(msg.dict()) + await brokerd_order_stream.send(msg) else: # this might be a cancel for an order that hasn't been @@ -880,7 +877,7 @@ async def process_client_order_cmds( resp='dark_cancelled', oid=oid, time_ns=time.time_ns(), - ).dict() + ) ) # de-register this client dialogue router.dialogues.pop(oid) @@ -935,7 +932,7 @@ async def process_client_order_cmds( # handle relaying the ems side responses back to # the client/cmd sender from this request log.info(f'Sending live order to {broker}:\n{pformat(msg)}') - await brokerd_order_stream.send(msg.dict()) + await brokerd_order_stream.send(msg) # an immediate response should be ``BrokerdOrderAck`` # with ems order id from the ``trades_dialogue()`` @@ -1015,7 +1012,7 @@ async def process_client_order_cmds( resp=resp, oid=oid, time_ns=time.time_ns(), - ).dict() + ) ) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index cf580876..802dcf46 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -117,7 +117,7 @@ class PaperBoi: reason='paper_trigger', remaining=size, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) # if we're already a clearing price simulate an immediate fill if ( @@ -173,7 +173,7 @@ class PaperBoi: broker=self.broker, time_ns=time.time_ns(), ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) async def fake_fill( self, @@ -216,7 +216,7 @@ class PaperBoi: 'name': self.broker + '_paper', }, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) if order_complete: @@ -240,7 +240,7 @@ class PaperBoi: 'name': self.broker, }, ) - await self.ems_trades_stream.send(msg.dict()) + await self.ems_trades_stream.send(msg) # lookup any existing position token = f'{symbol}.{self.broker}' @@ -268,7 +268,7 @@ class PaperBoi: ) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) - await self.ems_trades_stream.send(pp_msg.dict()) + await self.ems_trades_stream.send(pp_msg) async def simulate_fills( @@ -384,7 +384,7 @@ async def handle_order_requests( oid=request_msg['oid'], symbol=request_msg['symbol'], reason=f'Paper only. No account found: `{account}` ?', - ).dict()) + )) continue # validate @@ -416,7 +416,7 @@ async def handle_order_requests( # broker specific request id reqid=reqid, - ).dict() + ) ) elif action == 'cancel': diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 89e7b2d8..83a0ed48 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -264,7 +264,8 @@ class OrderMode: self, ) -> OrderDialog: - '''Send execution order to EMS return a level line to + ''' + Send execution order to EMS return a level line to represent the order on a chart. ''' @@ -273,13 +274,9 @@ class OrderMode: oid = str(uuid.uuid4()) # format order data for ems - fqsn = symbol.front_fqsn() - order = staged.copy( - update={ - 'symbol': fqsn, - 'oid': oid, - } - ) + order = staged.copy() + order.oid = oid + order.symbol = symbol.front_fqsn() line = self.line_from_order( order,