Drop remaining `BaseModel` api usage from rest of codebase
parent
3dfe7ef8dd
commit
b250a48b8d
|
@ -148,7 +148,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason=f'No account found: `{account}` ?',
|
reason=f'No account found: `{account}` ?',
|
||||||
).dict())
|
))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
client = _accounts2clients.get(account)
|
client = _accounts2clients.get(account)
|
||||||
|
@ -161,7 +161,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason=f'No api client loaded for account: `{account}` ?',
|
reason=f'No api client loaded for account: `{account}` ?',
|
||||||
).dict())
|
))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if action in {'buy', 'sell'}:
|
if action in {'buy', 'sell'}:
|
||||||
|
@ -188,7 +188,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason='Order already active?',
|
reason='Order already active?',
|
||||||
).dict())
|
))
|
||||||
|
|
||||||
# deliver ack that order has been submitted to broker routing
|
# deliver ack that order has been submitted to broker routing
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
|
@ -197,9 +197,8 @@ async def handle_order_requests(
|
||||||
oid=order.oid,
|
oid=order.oid,
|
||||||
# broker specific request id
|
# broker specific request id
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
|
||||||
account=account,
|
account=account,
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
elif action == 'cancel':
|
elif action == 'cancel':
|
||||||
|
@ -559,7 +558,7 @@ async def trades_dialogue(
|
||||||
cids2pps,
|
cids2pps,
|
||||||
validate=True,
|
validate=True,
|
||||||
)
|
)
|
||||||
all_positions.extend(msg.dict() for msg in msgs)
|
all_positions.extend(msg for msg in msgs)
|
||||||
|
|
||||||
if not all_positions and cids2pps:
|
if not all_positions and cids2pps:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
@ -665,7 +664,7 @@ async def emit_pp_update(
|
||||||
msg = msgs[0]
|
msg = msgs[0]
|
||||||
break
|
break
|
||||||
|
|
||||||
await ems_stream.send(msg.dict())
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
|
|
||||||
async def deliver_trade_events(
|
async def deliver_trade_events(
|
||||||
|
@ -743,7 +742,7 @@ async def deliver_trade_events(
|
||||||
|
|
||||||
broker_details={'name': 'ib'},
|
broker_details={'name': 'ib'},
|
||||||
)
|
)
|
||||||
await ems_stream.send(msg.dict())
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
case 'fill':
|
case 'fill':
|
||||||
|
|
||||||
|
@ -803,7 +802,7 @@ async def deliver_trade_events(
|
||||||
broker_time=trade_entry['broker_time'],
|
broker_time=trade_entry['broker_time'],
|
||||||
|
|
||||||
)
|
)
|
||||||
await ems_stream.send(msg.dict())
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
# 2 cases:
|
# 2 cases:
|
||||||
# - fill comes first or
|
# - fill comes first or
|
||||||
|
@ -879,7 +878,7 @@ async def deliver_trade_events(
|
||||||
cid, msg = pack_position(item)
|
cid, msg = pack_position(item)
|
||||||
# acctid = msg.account = accounts_def.inverse[msg.account]
|
# acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
# cuck ib and it's shitty fifo sys for pps!
|
# cuck ib and it's shitty fifo sys for pps!
|
||||||
# await ems_stream.send(msg.dict())
|
# await ems_stream.send(msg)
|
||||||
|
|
||||||
case 'event':
|
case 'event':
|
||||||
|
|
||||||
|
@ -891,7 +890,7 @@ async def deliver_trade_events(
|
||||||
# level...
|
# level...
|
||||||
# reqid = item.get('reqid', 0)
|
# reqid = item.get('reqid', 0)
|
||||||
# if getattr(msg, 'reqid', 0) < -1:
|
# if getattr(msg, 'reqid', 0) < -1:
|
||||||
# log.info(f"TWS triggered trade\n{pformat(msg.dict())}")
|
# log.info(f"TWS triggered trade\n{pformat(msg)}")
|
||||||
|
|
||||||
# msg.reqid = 'tws-' + str(-1 * reqid)
|
# msg.reqid = 'tws-' + str(-1 * reqid)
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,7 @@ async def handle_order_requests(
|
||||||
'Invalid request msg:\n{msg}'
|
'Invalid request msg:\n{msg}'
|
||||||
),
|
),
|
||||||
|
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -289,7 +289,7 @@ async def trades_dialogue(
|
||||||
avg_price=p.be_price,
|
avg_price=p.be_price,
|
||||||
currency='',
|
currency='',
|
||||||
)
|
)
|
||||||
position_msgs.append(msg.dict())
|
position_msgs.append(msg)
|
||||||
|
|
||||||
await ctx.started(
|
await ctx.started(
|
||||||
(position_msgs, [acc_name])
|
(position_msgs, [acc_name])
|
||||||
|
@ -419,7 +419,7 @@ async def handle_order_updates(
|
||||||
broker_details={'name': 'kraken'},
|
broker_details={'name': 'kraken'},
|
||||||
broker_time=broker_time
|
broker_time=broker_time
|
||||||
)
|
)
|
||||||
await ems_stream.send(fill_msg.dict())
|
await ems_stream.send(fill_msg)
|
||||||
|
|
||||||
filled_msg = BrokerdStatus(
|
filled_msg = BrokerdStatus(
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
|
@ -443,7 +443,7 @@ async def handle_order_updates(
|
||||||
# https://github.com/pikers/piker/issues/296
|
# https://github.com/pikers/piker/issues/296
|
||||||
remaining=0,
|
remaining=0,
|
||||||
)
|
)
|
||||||
await ems_stream.send(filled_msg.dict())
|
await ems_stream.send(filled_msg)
|
||||||
|
|
||||||
# update ledger and position tracking
|
# update ledger and position tracking
|
||||||
with open_ledger(acctid, trades) as trans:
|
with open_ledger(acctid, trades) as trans:
|
||||||
|
@ -480,7 +480,7 @@ async def handle_order_updates(
|
||||||
# TODO
|
# TODO
|
||||||
# currency=''
|
# currency=''
|
||||||
)
|
)
|
||||||
await ems_stream.send(pp_msg.dict())
|
await ems_stream.send(pp_msg)
|
||||||
|
|
||||||
# process and relay order state change events
|
# process and relay order state change events
|
||||||
# https://docs.kraken.com/websockets/#message-openOrders
|
# https://docs.kraken.com/websockets/#message-openOrders
|
||||||
|
@ -577,7 +577,7 @@ async def handle_order_updates(
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
msgs.append(resp)
|
msgs.append(resp)
|
||||||
await ems_stream.send(resp.dict())
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
|
|
@ -58,11 +58,11 @@ class OrderBook:
|
||||||
|
|
||||||
def send(
|
def send(
|
||||||
self,
|
self,
|
||||||
msg: Order,
|
msg: Order | dict,
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
self._sent_orders[msg.oid] = msg
|
self._sent_orders[msg.oid] = msg
|
||||||
self._to_ems.send_nowait(msg.dict())
|
self._to_ems.send_nowait(msg)
|
||||||
return msg
|
return msg
|
||||||
|
|
||||||
def update(
|
def update(
|
||||||
|
@ -73,9 +73,8 @@ class OrderBook:
|
||||||
|
|
||||||
) -> dict:
|
) -> dict:
|
||||||
cmd = self._sent_orders[uuid]
|
cmd = self._sent_orders[uuid]
|
||||||
msg = cmd.dict()
|
msg = cmd.copy(update=data)
|
||||||
msg.update(data)
|
self._sent_orders[uuid] = msg
|
||||||
self._sent_orders[uuid] = Order(**msg)
|
|
||||||
self._to_ems.send_nowait(msg)
|
self._to_ems.send_nowait(msg)
|
||||||
return cmd
|
return cmd
|
||||||
|
|
||||||
|
@ -88,7 +87,7 @@ class OrderBook:
|
||||||
oid=uuid,
|
oid=uuid,
|
||||||
symbol=cmd.symbol,
|
symbol=cmd.symbol,
|
||||||
)
|
)
|
||||||
self._to_ems.send_nowait(msg.dict())
|
self._to_ems.send_nowait(msg)
|
||||||
|
|
||||||
|
|
||||||
_orders: OrderBook = None
|
_orders: OrderBook = None
|
||||||
|
@ -149,7 +148,7 @@ async def relay_order_cmds_from_sync_code(
|
||||||
book = get_orders()
|
book = get_orders()
|
||||||
async with book._from_order_book.subscribe() as orders_stream:
|
async with book._from_order_book.subscribe() as orders_stream:
|
||||||
async for cmd in orders_stream:
|
async for cmd in orders_stream:
|
||||||
if cmd['symbol'] == symbol_key:
|
if cmd.symbol == symbol_key:
|
||||||
log.info(f'Send order cmd:\n{pformat(cmd)}')
|
log.info(f'Send order cmd:\n{pformat(cmd)}')
|
||||||
# send msg over IPC / wire
|
# send msg over IPC / wire
|
||||||
await to_ems_stream.send(cmd)
|
await to_ems_stream.send(cmd)
|
||||||
|
|
|
@ -232,7 +232,7 @@ async def clear_dark_triggers(
|
||||||
price=submit_price,
|
price=submit_price,
|
||||||
size=cmd['size'],
|
size=cmd['size'],
|
||||||
)
|
)
|
||||||
await brokerd_orders_stream.send(msg.dict())
|
await brokerd_orders_stream.send(msg)
|
||||||
|
|
||||||
# mark this entry as having sent an order
|
# mark this entry as having sent an order
|
||||||
# request. the entry will be replaced once the
|
# request. the entry will be replaced once the
|
||||||
|
@ -248,14 +248,11 @@ async def clear_dark_triggers(
|
||||||
|
|
||||||
msg = Status(
|
msg = Status(
|
||||||
oid=oid, # ems order id
|
oid=oid, # ems order id
|
||||||
resp=resp,
|
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
symbol=fqsn,
|
resp=resp,
|
||||||
trigger_price=price,
|
trigger_price=price,
|
||||||
broker_details={'name': broker},
|
brokerd_msg=cmd,
|
||||||
cmd=cmd, # original request message
|
)
|
||||||
|
|
||||||
).dict()
|
|
||||||
|
|
||||||
# remove exec-condition from set
|
# remove exec-condition from set
|
||||||
log.info(f'removing pred for {oid}')
|
log.info(f'removing pred for {oid}')
|
||||||
|
@ -578,11 +575,11 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
if name == 'position':
|
if name == 'position':
|
||||||
|
|
||||||
pos_msg = BrokerdPosition(**brokerd_msg).dict()
|
pos_msg = BrokerdPosition(**brokerd_msg)
|
||||||
|
|
||||||
# XXX: this will be useful for automatic strats yah?
|
# XXX: this will be useful for automatic strats yah?
|
||||||
# keep pps per account up to date locally in ``emsd`` mem
|
# keep pps per account up to date locally in ``emsd`` mem
|
||||||
sym, broker = pos_msg['symbol'], pos_msg['broker']
|
sym, broker = pos_msg.symbol, pos_msg.broker
|
||||||
|
|
||||||
relay.positions.setdefault(
|
relay.positions.setdefault(
|
||||||
# NOTE: translate to a FQSN!
|
# NOTE: translate to a FQSN!
|
||||||
|
@ -684,7 +681,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
entry.reqid = reqid
|
entry.reqid = reqid
|
||||||
|
|
||||||
# tell broker to cancel immediately
|
# tell broker to cancel immediately
|
||||||
await brokerd_trades_stream.send(entry.dict())
|
await brokerd_trades_stream.send(entry)
|
||||||
|
|
||||||
# - the order is now active and will be mirrored in
|
# - the order is now active and will be mirrored in
|
||||||
# our book -> registered as live flow
|
# our book -> registered as live flow
|
||||||
|
@ -724,7 +721,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
# if 10147 in message: cancel
|
# if 10147 in message: cancel
|
||||||
|
|
||||||
resp = 'broker_errored'
|
resp = 'broker_errored'
|
||||||
broker_details = msg.dict()
|
broker_details = msg
|
||||||
|
|
||||||
# don't relay message to order requester client
|
# don't relay message to order requester client
|
||||||
# continue
|
# continue
|
||||||
|
@ -759,7 +756,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
resp = 'broker_' + msg.status
|
resp = 'broker_' + msg.status
|
||||||
|
|
||||||
# pass the BrokerdStatus msg inside the broker details field
|
# pass the BrokerdStatus msg inside the broker details field
|
||||||
broker_details = msg.dict()
|
broker_details = msg
|
||||||
|
|
||||||
elif name in (
|
elif name in (
|
||||||
'fill',
|
'fill',
|
||||||
|
@ -768,7 +765,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# proxy through the "fill" result(s)
|
# proxy through the "fill" result(s)
|
||||||
resp = 'broker_filled'
|
resp = 'broker_filled'
|
||||||
broker_details = msg.dict()
|
broker_details = msg
|
||||||
|
|
||||||
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
|
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
|
||||||
|
|
||||||
|
@ -786,7 +783,7 @@ async def translate_and_relay_brokerd_events(
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
broker_reqid=reqid,
|
broker_reqid=reqid,
|
||||||
brokerd_msg=broker_details,
|
brokerd_msg=broker_details,
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.error(
|
log.error(
|
||||||
|
@ -858,7 +855,7 @@ async def process_client_order_cmds(
|
||||||
f'Submitting cancel for live order {reqid}'
|
f'Submitting cancel for live order {reqid}'
|
||||||
)
|
)
|
||||||
|
|
||||||
await brokerd_order_stream.send(msg.dict())
|
await brokerd_order_stream.send(msg)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# this might be a cancel for an order that hasn't been
|
# this might be a cancel for an order that hasn't been
|
||||||
|
@ -880,7 +877,7 @@ async def process_client_order_cmds(
|
||||||
resp='dark_cancelled',
|
resp='dark_cancelled',
|
||||||
oid=oid,
|
oid=oid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
# de-register this client dialogue
|
# de-register this client dialogue
|
||||||
router.dialogues.pop(oid)
|
router.dialogues.pop(oid)
|
||||||
|
@ -935,7 +932,7 @@ async def process_client_order_cmds(
|
||||||
# handle relaying the ems side responses back to
|
# handle relaying the ems side responses back to
|
||||||
# the client/cmd sender from this request
|
# the client/cmd sender from this request
|
||||||
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
|
log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
|
||||||
await brokerd_order_stream.send(msg.dict())
|
await brokerd_order_stream.send(msg)
|
||||||
|
|
||||||
# an immediate response should be ``BrokerdOrderAck``
|
# an immediate response should be ``BrokerdOrderAck``
|
||||||
# with ems order id from the ``trades_dialogue()``
|
# with ems order id from the ``trades_dialogue()``
|
||||||
|
@ -1015,7 +1012,7 @@ async def process_client_order_cmds(
|
||||||
resp=resp,
|
resp=resp,
|
||||||
oid=oid,
|
oid=oid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -117,7 +117,7 @@ class PaperBoi:
|
||||||
reason='paper_trigger',
|
reason='paper_trigger',
|
||||||
remaining=size,
|
remaining=size,
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
# if we're already a clearing price simulate an immediate fill
|
# if we're already a clearing price simulate an immediate fill
|
||||||
if (
|
if (
|
||||||
|
@ -173,7 +173,7 @@ class PaperBoi:
|
||||||
broker=self.broker,
|
broker=self.broker,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
async def fake_fill(
|
async def fake_fill(
|
||||||
self,
|
self,
|
||||||
|
@ -216,7 +216,7 @@ class PaperBoi:
|
||||||
'name': self.broker + '_paper',
|
'name': self.broker + '_paper',
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
if order_complete:
|
if order_complete:
|
||||||
|
|
||||||
|
@ -240,7 +240,7 @@ class PaperBoi:
|
||||||
'name': self.broker,
|
'name': self.broker,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg.dict())
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
# lookup any existing position
|
# lookup any existing position
|
||||||
token = f'{symbol}.{self.broker}'
|
token = f'{symbol}.{self.broker}'
|
||||||
|
@ -268,7 +268,7 @@ class PaperBoi:
|
||||||
)
|
)
|
||||||
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
|
pp_msg.size, pp_msg.avg_price = pp.lifo_update(size, price)
|
||||||
|
|
||||||
await self.ems_trades_stream.send(pp_msg.dict())
|
await self.ems_trades_stream.send(pp_msg)
|
||||||
|
|
||||||
|
|
||||||
async def simulate_fills(
|
async def simulate_fills(
|
||||||
|
@ -384,7 +384,7 @@ async def handle_order_requests(
|
||||||
oid=request_msg['oid'],
|
oid=request_msg['oid'],
|
||||||
symbol=request_msg['symbol'],
|
symbol=request_msg['symbol'],
|
||||||
reason=f'Paper only. No account found: `{account}` ?',
|
reason=f'Paper only. No account found: `{account}` ?',
|
||||||
).dict())
|
))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# validate
|
# validate
|
||||||
|
@ -416,7 +416,7 @@ async def handle_order_requests(
|
||||||
# broker specific request id
|
# broker specific request id
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
|
|
||||||
).dict()
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
elif action == 'cancel':
|
elif action == 'cancel':
|
||||||
|
|
|
@ -264,7 +264,8 @@ class OrderMode:
|
||||||
self,
|
self,
|
||||||
|
|
||||||
) -> OrderDialog:
|
) -> OrderDialog:
|
||||||
'''Send execution order to EMS return a level line to
|
'''
|
||||||
|
Send execution order to EMS return a level line to
|
||||||
represent the order on a chart.
|
represent the order on a chart.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
@ -273,13 +274,9 @@ class OrderMode:
|
||||||
oid = str(uuid.uuid4())
|
oid = str(uuid.uuid4())
|
||||||
|
|
||||||
# format order data for ems
|
# format order data for ems
|
||||||
fqsn = symbol.front_fqsn()
|
order = staged.copy()
|
||||||
order = staged.copy(
|
order.oid = oid
|
||||||
update={
|
order.symbol = symbol.front_fqsn()
|
||||||
'symbol': fqsn,
|
|
||||||
'oid': oid,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
line = self.line_from_order(
|
line = self.line_from_order(
|
||||||
order,
|
order,
|
||||||
|
|
Loading…
Reference in New Issue