Compare commits

..

No commits in common. "d1eec24ed5157ce4ae703dda9fd0a5f3b27ae606" and "709269fcf7050763de55159b6a9a930d61b67f47" have entirely different histories.

3 changed files with 25 additions and 55 deletions

View File

@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
for fill in fills: for fill in fills:
con: Contract = fill.contract con: Contract = fill.contract
conid: str = con.conId conid: str = con.conId
pexch: str|None = con.primaryExchange or con.exchange pexch: str | None = con.primaryExchange
if not pexch: if not pexch:
cons = await client.get_con(conid=conid) cons = await client.get_con(conid=conid)

View File

@ -97,11 +97,11 @@ MsgUnion = Union[
] ]
# class TooFastEdit(Exception): class TooFastEdit(Exception):
# 'Edit requests faster then api submissions' 'Edit requests faster then api submissions'
# reg_err_types([TooFastEdit]) reg_err_types([TooFastEdit])
# TODO: make this wrap the `api.Client` and `ws` instances # TODO: make this wrap the `api.Client` and `ws` instances
@ -138,12 +138,11 @@ async def handle_order_requests(
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: dict[int, str], reqids2txids: dict[int, str],
toofastedit: set[int],
) -> None: ) -> None:
''' '''
`trio.Task` which handles order ctl requests from the EMS and Process new order submission requests from the EMS
deliver acks or errors back on that IPC dialog. and deliver acks or errors.
''' '''
# XXX: UGH, let's unify this.. with ``msgspec``!!! # XXX: UGH, let's unify this.. with ``msgspec``!!!
@ -161,14 +160,8 @@ async def handle_order_requests(
txid = reqids2txids[reqid] txid = reqids2txids[reqid]
except KeyError: except KeyError:
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
# SEEMS TO on the race case with the update task?
# - update dark order quickly after
# triggered-submitted and then we have inavlid
# value in `reqids2txids` sent over ws.send()??
log.error('TOO FAST CANCEL/EDIT') log.error('TOO FAST CANCEL/EDIT')
# reqids2txids[reqid] = TooFastEdit(reqid) reqids2txids[reqid] = TooFastEdit(reqid)
toofastedit.add(reqid)
reqids2txids[reqid] = reqid
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
@ -206,10 +199,7 @@ async def handle_order_requests(
# XXX: not sure if this block ever gets hit now? # XXX: not sure if this block ever gets hit now?
log.error('TOO FAST EDIT') log.error('TOO FAST EDIT')
# reqids2txids[reqid] = TooFastEdit(reqid) reqids2txids[reqid] = TooFastEdit(reqid)
reqids2txids[reqid] = reqid
toofastedit.add(reqid)
await tractor.pause()
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
oid=msg['oid'], oid=msg['oid'],
@ -675,10 +665,6 @@ async def open_trade_dialog(
token: str = await client.get_ws_token() token: str = await client.get_ws_token()
# XXX tracks EMS orders which are updated too quickly
# on the emds side with sync-issues on the kraken side.
toofastedit: set[int] = set()
ws: NoBsWs ws: NoBsWs
async with ( async with (
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -694,16 +680,15 @@ async def open_trade_dialog(
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
# task for processing inbound requests from ems # task for processing inbound requests from ems
tn.start_soon(partial( tn.start_soon(
handle_order_requests, handle_order_requests,
ws=ws, ws,
client=client, client,
ems_stream=ems_stream, ems_stream,
apiflows=apiflows, apiflows,
ids=ids, ids,
reqids2txids=reqids2txids, reqids2txids,
toofastedit=toofastedit, )
))
# enter relay loop # enter relay loop
await handle_order_updates( await handle_order_updates(
@ -714,7 +699,6 @@ async def open_trade_dialog(
apiflows=apiflows, apiflows=apiflows,
ids=ids, ids=ids,
reqids2txids=reqids2txids, reqids2txids=reqids2txids,
toofastedit=toofastedit,
acnt=acnt, acnt=acnt,
ledger=ledger, ledger=ledger,
acctid=acctid, acctid=acctid,
@ -730,7 +714,6 @@ async def handle_order_updates(
apiflows: OrderDialogs, apiflows: OrderDialogs,
ids: bidict[str, int], ids: bidict[str, int],
reqids2txids: bidict[int, str], reqids2txids: bidict[int, str],
toofastedit: set[int],
acnt: Account, acnt: Account,
# transaction records which will be updated # transaction records which will be updated
@ -1038,12 +1021,10 @@ async def handle_order_updates(
# <-> ems dialog. # <-> ems dialog.
if ( if (
status == 'open' status == 'open'
and and isinstance(
reqid in toofastedit reqids2txids.get(reqid),
# isinstance( TooFastEdit
# reqids2txids.get(reqid), )
# TooFastEdit
# )
): ):
# TODO: don't even allow this case # TODO: don't even allow this case
# by not moving the client side line # by not moving the client side line
@ -1205,9 +1186,7 @@ async def handle_order_updates(
txid txid
# we throttle too-fast-requests on the ems side # we throttle too-fast-requests on the ems side
and and not isinstance(txid, TooFastEdit)
reqid in toofastedit
# not isinstance(txid, TooFastEdit)
): ):
# client was editting too quickly # client was editting too quickly
# so we instead cancel this order # so we instead cancel this order

View File

@ -1022,22 +1022,13 @@ async def open_order_mode(
started.set() started.set()
for oid, msg in ems_dialog_msgs.items(): for oid, msg in ems_dialog_msgs.items():
# HACK ALERT: ensure a resp field is filled out since # HACK ALERT: ensure a resp field is filled out since
# techincally the call below expects a ``Status``. TODO: # techincally the call below expects a ``Status``. TODO:
# parse into proper ``Status`` equivalents ems-side? # parse into proper ``Status`` equivalents ems-side?
# msg.setdefault('resp', msg['broker_details']['resp']) # msg.setdefault('resp', msg['broker_details']['resp'])
# msg.setdefault('oid', msg['broker_details']['oid']) # msg.setdefault('oid', msg['broker_details']['oid'])
ya_msg: dict = msg.setdefault( msg['brokerd_msg'] = msg
'brokerd_msg',
msg,
)
if msg is not ya_msg:
log.warning(
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
f'oid: {oid!r}\n'
f'ya_msg: {ya_msg!r}\n'
f'msg: {ya_msg!r}\n'
)
await process_trade_msg( await process_trade_msg(
mode, mode,