Drop remaining `BaseModel` api usage from rest of codebase

tractor_typed_msg_hackin
Tyler Goodlet 2022-07-08 10:55:02 -04:00
parent 583fa79e5e
commit de91c2196d
6 changed files with 50 additions and 58 deletions

View File

@ -148,7 +148,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No account found: `{account}` ?', reason=f'No account found: `{account}` ?',
).dict()) ))
continue continue
client = _accounts2clients.get(account) client = _accounts2clients.get(account)
@ -161,7 +161,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'No api client loaded for account: `{account}` ?', reason=f'No api client loaded for account: `{account}` ?',
).dict()) ))
continue continue
if action in {'buy', 'sell'}: if action in {'buy', 'sell'}:
@ -188,7 +188,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason='Order already active?', reason='Order already active?',
).dict()) ))
# deliver ack that order has been submitted to broker routing # deliver ack that order has been submitted to broker routing
await ems_order_stream.send( await ems_order_stream.send(
@ -197,9 +197,8 @@ async def handle_order_requests(
oid=order.oid, oid=order.oid,
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
time_ns=time.time_ns(),
account=account, account=account,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':
@ -559,7 +558,7 @@ async def trades_dialogue(
cids2pps, cids2pps,
validate=True, validate=True,
) )
all_positions.extend(msg.dict() for msg in msgs) all_positions.extend(msg for msg in msgs)
if not all_positions and cids2pps: if not all_positions and cids2pps:
raise RuntimeError( raise RuntimeError(
@ -665,7 +664,7 @@ async def emit_pp_update(
msg = msgs[0] msg = msgs[0]
break break
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
async def deliver_trade_events( async def deliver_trade_events(
@ -743,7 +742,7 @@ async def deliver_trade_events(
broker_details={'name': 'ib'}, broker_details={'name': 'ib'},
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
case 'fill': case 'fill':
@ -803,7 +802,7 @@ async def deliver_trade_events(
broker_time=trade_entry['broker_time'], broker_time=trade_entry['broker_time'],
) )
await ems_stream.send(msg.dict()) await ems_stream.send(msg)
# 2 cases: # 2 cases:
# - fill comes first or # - fill comes first or
@ -879,7 +878,7 @@ async def deliver_trade_events(
cid, msg = pack_position(item) cid, msg = pack_position(item)
# acctid = msg.account = accounts_def.inverse[msg.account] # acctid = msg.account = accounts_def.inverse[msg.account]
# cuck ib and it's shitty fifo sys for pps! # cuck ib and it's shitty fifo sys for pps!
# await ems_stream.send(msg.dict()) # await ems_stream.send(msg)
case 'event': case 'event':
@ -891,7 +890,7 @@ async def deliver_trade_events(
# level... # level...
# reqid = item.get('reqid', 0) # reqid = item.get('reqid', 0)
# if getattr(msg, 'reqid', 0) < -1: # if getattr(msg, 'reqid', 0) < -1:
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}") # log.info(f"TWS triggered trade\n{pformat(msg)}")
# msg.reqid = 'tws-' + str(-1 * reqid) # msg.reqid = 'tws-' + str(-1 * reqid)

View File

@ -184,7 +184,7 @@ async def handle_order_requests(
'Invalid request msg:\n{msg}' 'Invalid request msg:\n{msg}'
), ),
).dict() )
) )
@ -282,7 +282,7 @@ async def trades_dialogue(
avg_price=p.be_price, avg_price=p.be_price,
currency='', currency='',
) )
position_msgs.append(msg.dict()) position_msgs.append(msg)
await ctx.started( await ctx.started(
(position_msgs, [acc_name]) (position_msgs, [acc_name])
@ -405,7 +405,7 @@ async def handle_order_updates(
broker_details={'name': 'kraken'}, broker_details={'name': 'kraken'},
broker_time=broker_time broker_time=broker_time
) )
await ems_stream.send(fill_msg.dict()) await ems_stream.send(fill_msg)
filled_msg = BrokerdStatus( filled_msg = BrokerdStatus(
reqid=reqid, reqid=reqid,
@ -429,7 +429,7 @@ async def handle_order_updates(
# https://github.com/pikers/piker/issues/296 # https://github.com/pikers/piker/issues/296
remaining=0, remaining=0,
) )
await ems_stream.send(filled_msg.dict()) await ems_stream.send(filled_msg)
# update ledger and position tracking # update ledger and position tracking
with open_ledger(acctid, trades) as trans: with open_ledger(acctid, trades) as trans:
@ -466,7 +466,7 @@ async def handle_order_updates(
# TODO # TODO
# currency='' # currency=''
) )
await ems_stream.send(pp_msg.dict()) await ems_stream.send(pp_msg)
# process and relay order state change events # process and relay order state change events
# https://docs.kraken.com/websockets/#message-openOrders # https://docs.kraken.com/websockets/#message-openOrders
@ -563,7 +563,7 @@ async def handle_order_updates(
), ),
) )
msgs.append(resp) msgs.append(resp)
await ems_stream.send(resp.dict()) await ems_stream.send(resp)
case _: case _:
log.warning( log.warning(
@ -603,7 +603,7 @@ async def handle_order_updates(
msgs.extend(resps) msgs.extend(resps)
for resp in resps: for resp in resps:
await ems_stream.send(resp.dict()) await ems_stream.send(resp)
case _: case _:
log.warning(f'Unhandled trades update msg: {msg}') log.warning(f'Unhandled trades update msg: {msg}')

View File

@ -58,11 +58,11 @@ class OrderBook:
def send( def send(
self, self,
msg: Order, msg: Order | dict,
) -> dict: ) -> dict:
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
return msg return msg
def update( def update(
@ -73,9 +73,8 @@ class OrderBook:
) -> dict: ) -> dict:
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.dict() msg = cmd.copy(update=data)
msg.update(data) self._sent_orders[uuid] = msg
self._sent_orders[uuid] = Order(**msg)
self._to_ems.send_nowait(msg) self._to_ems.send_nowait(msg)
return cmd return cmd
@ -88,7 +87,7 @@ class OrderBook:
oid=uuid, oid=uuid,
symbol=cmd.symbol, symbol=cmd.symbol,
) )
self._to_ems.send_nowait(msg.dict()) self._to_ems.send_nowait(msg)
_orders: OrderBook = None _orders: OrderBook = None
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
book = get_orders() book = get_orders()
async with book._from_order_book.subscribe() as orders_stream: async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream: async for cmd in orders_stream:
if cmd['symbol'] == symbol_key: if cmd.symbol == symbol_key:
log.info(f'Send order cmd:\n{pformat(cmd)}') log.info(f'Send order cmd:\n{pformat(cmd)}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)

View File

@ -232,7 +232,7 @@ async def clear_dark_triggers(
price=submit_price, price=submit_price,
size=cmd['size'], size=cmd['size'],
) )
await brokerd_orders_stream.send(msg.dict()) await brokerd_orders_stream.send(msg)
# mark this entry as having sent an order # mark this entry as having sent an order
# request. the entry will be replaced once the # request. the entry will be replaced once the
@ -248,14 +248,11 @@ async def clear_dark_triggers(
msg = Status( msg = Status(
oid=oid, # ems order id oid=oid, # ems order id
resp=resp,
time_ns=time.time_ns(), time_ns=time.time_ns(),
symbol=fqsn, resp=resp,
trigger_price=price, trigger_price=price,
broker_details={'name': broker}, brokerd_msg=cmd,
cmd=cmd, # original request message )
).dict()
# remove exec-condition from set # remove exec-condition from set
log.info(f'removing pred for {oid}') log.info(f'removing pred for {oid}')
@ -578,11 +575,11 @@ async def translate_and_relay_brokerd_events(
if name == 'position': if name == 'position':
pos_msg = BrokerdPosition(**brokerd_msg).dict() pos_msg = BrokerdPosition(**brokerd_msg)
# XXX: this will be useful for automatic strats yah? # XXX: this will be useful for automatic strats yah?
# keep pps per account up to date locally in ``emsd`` mem # keep pps per account up to date locally in ``emsd`` mem
sym, broker = pos_msg['symbol'], pos_msg['broker'] sym, broker = pos_msg.symbol, pos_msg.broker
relay.positions.setdefault( relay.positions.setdefault(
# NOTE: translate to a FQSN! # NOTE: translate to a FQSN!
@ -684,7 +681,7 @@ async def translate_and_relay_brokerd_events(
entry.reqid = reqid entry.reqid = reqid
# tell broker to cancel immediately # tell broker to cancel immediately
await brokerd_trades_stream.send(entry.dict()) await brokerd_trades_stream.send(entry)
# - the order is now active and will be mirrored in # - the order is now active and will be mirrored in
# our book -> registered as live flow # our book -> registered as live flow
@ -724,7 +721,7 @@ async def translate_and_relay_brokerd_events(
# if 10147 in message: cancel # if 10147 in message: cancel
resp = 'broker_errored' resp = 'broker_errored'
broker_details = msg.dict() broker_details = msg
# don't relay message to order requester client # don't relay message to order requester client
# continue # continue
@ -759,7 +756,7 @@ async def translate_and_relay_brokerd_events(
resp = 'broker_' + msg.status resp = 'broker_' + msg.status
# pass the BrokerdStatus msg inside the broker details field # pass the BrokerdStatus msg inside the broker details field
broker_details = msg.dict() broker_details = msg
elif name in ( elif name in (
'fill', 'fill',
@ -768,7 +765,7 @@ async def translate_and_relay_brokerd_events(
# proxy through the "fill" result(s) # proxy through the "fill" result(s)
resp = 'broker_filled' resp = 'broker_filled'
broker_details = msg.dict() broker_details = msg
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
@ -786,7 +783,7 @@ async def translate_and_relay_brokerd_events(
time_ns=time.time_ns(), time_ns=time.time_ns(),
broker_reqid=reqid, broker_reqid=reqid,
brokerd_msg=broker_details, brokerd_msg=broker_details,
).dict() )
) )
except KeyError: except KeyError:
log.error( log.error(
@ -858,7 +855,7 @@ async def process_client_order_cmds(
f'Submitting cancel for live order {reqid}' f'Submitting cancel for live order {reqid}'
) )
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
else: else:
# this might be a cancel for an order that hasn't been # this might be a cancel for an order that hasn't been
@ -880,7 +877,7 @@ async def process_client_order_cmds(
resp='dark_cancelled', resp='dark_cancelled',
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )
# de-register this client dialogue # de-register this client dialogue
router.dialogues.pop(oid) router.dialogues.pop(oid)
@ -935,7 +932,7 @@ async def process_client_order_cmds(
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info(f'Sending live order to {broker}:\n{pformat(msg)}') log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
await brokerd_order_stream.send(msg.dict()) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
# with ems order id from the ``trades_dialogue()`` # with ems order id from the ``trades_dialogue()``
@ -1015,7 +1012,7 @@ async def process_client_order_cmds(
resp=resp, resp=resp,
oid=oid, oid=oid,
time_ns=time.time_ns(), time_ns=time.time_ns(),
).dict() )
) )

View File

@ -117,7 +117,7 @@ class PaperBoi:
reason='paper_trigger', reason='paper_trigger',
remaining=size, remaining=size,
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# if we're already a clearing price simulate an immediate fill # if we're already a clearing price simulate an immediate fill
if ( if (
@ -173,7 +173,7 @@ class PaperBoi:
broker=self.broker, broker=self.broker,
time_ns=time.time_ns(), time_ns=time.time_ns(),
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
async def fake_fill( async def fake_fill(
self, self,
@ -216,7 +216,7 @@ class PaperBoi:
'name': self.broker + '_paper', 'name': self.broker + '_paper',
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
if order_complete: if order_complete:
@ -240,7 +240,7 @@ class PaperBoi:
'name': self.broker, 'name': self.broker,
}, },
) )
await self.ems_trades_stream.send(msg.dict()) await self.ems_trades_stream.send(msg)
# lookup any existing position # lookup any existing position
token = f'{symbol}.{self.broker}' token = f'{symbol}.{self.broker}'
@ -268,7 +268,7 @@ class PaperBoi:
) )
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price) pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
await self.ems_trades_stream.send(pp_msg.dict()) await self.ems_trades_stream.send(pp_msg)
async def simulate_fills( async def simulate_fills(
@ -384,7 +384,7 @@ async def handle_order_requests(
oid=request_msg['oid'], oid=request_msg['oid'],
symbol=request_msg['symbol'], symbol=request_msg['symbol'],
reason=f'Paper only. No account found: `{account}` ?', reason=f'Paper only. No account found: `{account}` ?',
).dict()) ))
continue continue
# validate # validate
@ -416,7 +416,7 @@ async def handle_order_requests(
# broker specific request id # broker specific request id
reqid=reqid, reqid=reqid,
).dict() )
) )
elif action == 'cancel': elif action == 'cancel':

View File

@ -264,7 +264,8 @@ class OrderMode:
self, self,
) -> OrderDialog: ) -> OrderDialog:
'''Send execution order to EMS return a level line to '''
Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
''' '''
@ -273,13 +274,9 @@ class OrderMode:
oid = str(uuid.uuid4()) oid = str(uuid.uuid4())
# format order data for ems # format order data for ems
fqsn = symbol.front_fqsn() order = staged.copy()
order = staged.copy( order.oid = oid
update={ order.symbol = symbol.front_fqsn()
'symbol': fqsn,
'oid': oid,
}
)
line = self.line_from_order( line = self.line_from_order(
order, order,