Compare commits

..

No commits in common. "d1eec24ed5157ce4ae703dda9fd0a5f3b27ae606" and "709269fcf7050763de55159b6a9a930d61b67f47" have entirely different histories.

3 changed files with 25 additions and 55 deletions

View File

@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
for fill in fills:
con: Contract = fill.contract
conid: str = con.conId
pexch: str|None = con.primaryExchange or con.exchange
pexch: str | None = con.primaryExchange
if not pexch:
cons = await client.get_con(conid=conid)

View File

@ -97,11 +97,11 @@ MsgUnion = Union[
]
# class TooFastEdit(Exception):
# 'Edit requests faster then api submissions'
class TooFastEdit(Exception):
'Edit requests faster then api submissions'
# reg_err_types([TooFastEdit])
reg_err_types([TooFastEdit])
# TODO: make this wrap the `api.Client` and `ws` instances
@ -138,12 +138,11 @@ async def handle_order_requests(
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: dict[int, str],
toofastedit: set[int],
) -> None:
'''
`trio.Task` which handles order ctl requests from the EMS and
deliver acks or errors back on that IPC dialog.
Process new order submission requests from the EMS
and deliver acks or errors.
'''
# XXX: UGH, let's unify this.. with ``msgspec``!!!
@ -161,14 +160,8 @@ async def handle_order_requests(
txid = reqids2txids[reqid]
except KeyError:
# XXX: not sure if this block ever gets hit now?
# SEEMS TO on the race case with the update task?
# - update dark order quickly after
# triggered-submitted and then we have inavlid
# value in `reqids2txids` sent over ws.send()??
log.error('TOO FAST CANCEL/EDIT')
# reqids2txids[reqid] = TooFastEdit(reqid)
toofastedit.add(reqid)
reqids2txids[reqid] = reqid
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
@ -206,10 +199,7 @@ async def handle_order_requests(
# XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT')
# reqids2txids[reqid] = TooFastEdit(reqid)
reqids2txids[reqid] = reqid
toofastedit.add(reqid)
await tractor.pause()
reqids2txids[reqid] = TooFastEdit(reqid)
await ems_order_stream.send(
BrokerdError(
oid=msg['oid'],
@ -675,10 +665,6 @@ async def open_trade_dialog(
token: str = await client.get_ws_token()
# XXX tracks EMS orders which are updated too quickly
# on the emds side with sync-issues on the kraken side.
toofastedit: set[int] = set()
ws: NoBsWs
async with (
ctx.open_stream() as ems_stream,
@ -694,16 +680,15 @@ async def open_trade_dialog(
trio.open_nursery() as tn,
):
# task for processing inbound requests from ems
tn.start_soon(partial(
tn.start_soon(
handle_order_requests,
ws=ws,
client=client,
ems_stream=ems_stream,
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
toofastedit=toofastedit,
))
ws,
client,
ems_stream,
apiflows,
ids,
reqids2txids,
)
# enter relay loop
await handle_order_updates(
@ -714,7 +699,6 @@ async def open_trade_dialog(
apiflows=apiflows,
ids=ids,
reqids2txids=reqids2txids,
toofastedit=toofastedit,
acnt=acnt,
ledger=ledger,
acctid=acctid,
@ -730,7 +714,6 @@ async def handle_order_updates(
apiflows: OrderDialogs,
ids: bidict[str, int],
reqids2txids: bidict[int, str],
toofastedit: set[int],
acnt: Account,
# transaction records which will be updated
@ -1038,12 +1021,10 @@ async def handle_order_updates(
# <-> ems dialog.
if (
status == 'open'
and
reqid in toofastedit
# isinstance(
# reqids2txids.get(reqid),
# TooFastEdit
# )
and isinstance(
reqids2txids.get(reqid),
TooFastEdit
)
):
# TODO: don't even allow this case
# by not moving the client side line
@ -1205,9 +1186,7 @@ async def handle_order_updates(
txid
# we throttle too-fast-requests on the ems side
and
reqid in toofastedit
# not isinstance(txid, TooFastEdit)
and not isinstance(txid, TooFastEdit)
):
# client was editting too quickly
# so we instead cancel this order

View File

@ -1022,22 +1022,13 @@ async def open_order_mode(
started.set()
for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid'])
ya_msg: dict = msg.setdefault(
'brokerd_msg',
msg,
)
if msg is not ya_msg:
log.warning(
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
f'oid: {oid!r}\n'
f'ya_msg: {ya_msg!r}\n'
f'msg: {ya_msg!r}\n'
)
msg['brokerd_msg'] = msg
await process_trade_msg(
mode,