Compare commits
3 Commits
709269fcf7
...
d1eec24ed5
| Author | SHA1 | Date |
|---|---|---|
|
|
d1eec24ed5 | |
|
|
d06bbec24f | |
|
|
8cefc1bdf8 |
|
|
@ -501,7 +501,7 @@ async def update_ledger_from_api_trades(
|
||||||
for fill in fills:
|
for fill in fills:
|
||||||
con: Contract = fill.contract
|
con: Contract = fill.contract
|
||||||
conid: str = con.conId
|
conid: str = con.conId
|
||||||
pexch: str | None = con.primaryExchange
|
pexch: str|None = con.primaryExchange or con.exchange
|
||||||
|
|
||||||
if not pexch:
|
if not pexch:
|
||||||
cons = await client.get_con(conid=conid)
|
cons = await client.get_con(conid=conid)
|
||||||
|
|
|
||||||
|
|
@ -97,11 +97,11 @@ MsgUnion = Union[
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class TooFastEdit(Exception):
|
# class TooFastEdit(Exception):
|
||||||
'Edit requests faster then api submissions'
|
# 'Edit requests faster then api submissions'
|
||||||
|
|
||||||
|
|
||||||
reg_err_types([TooFastEdit])
|
# reg_err_types([TooFastEdit])
|
||||||
|
|
||||||
|
|
||||||
# TODO: make this wrap the `api.Client` and `ws` instances
|
# TODO: make this wrap the `api.Client` and `ws` instances
|
||||||
|
|
@ -138,15 +138,16 @@ async def handle_order_requests(
|
||||||
apiflows: OrderDialogs,
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: dict[int, str],
|
reqids2txids: dict[int, str],
|
||||||
|
toofastedit: set[int],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
Process new order submission requests from the EMS
|
`trio.Task` which handles order ctl requests from the EMS and
|
||||||
and deliver acks or errors.
|
deliver acks or errors back on that IPC dialog.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# XXX: UGH, let's unify this.. with ``msgspec``!!!
|
# XXX: UGH, let's unify this.. with ``msgspec``!!!
|
||||||
msg: dict | Order
|
msg: dict|Order
|
||||||
async for msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
match msg:
|
match msg:
|
||||||
|
|
@ -160,8 +161,14 @@ async def handle_order_requests(
|
||||||
txid = reqids2txids[reqid]
|
txid = reqids2txids[reqid]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
# XXX: not sure if this block ever gets hit now?
|
# XXX: not sure if this block ever gets hit now?
|
||||||
|
# SEEMS TO on the race case with the update task?
|
||||||
|
# - update dark order quickly after
|
||||||
|
# triggered-submitted and then we have inavlid
|
||||||
|
# value in `reqids2txids` sent over ws.send()??
|
||||||
log.error('TOO FAST CANCEL/EDIT')
|
log.error('TOO FAST CANCEL/EDIT')
|
||||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
# reqids2txids[reqid] = TooFastEdit(reqid)
|
||||||
|
toofastedit.add(reqid)
|
||||||
|
reqids2txids[reqid] = reqid
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
oid=msg['oid'],
|
oid=msg['oid'],
|
||||||
|
|
@ -199,13 +206,16 @@ async def handle_order_requests(
|
||||||
|
|
||||||
# XXX: not sure if this block ever gets hit now?
|
# XXX: not sure if this block ever gets hit now?
|
||||||
log.error('TOO FAST EDIT')
|
log.error('TOO FAST EDIT')
|
||||||
reqids2txids[reqid] = TooFastEdit(reqid)
|
# reqids2txids[reqid] = TooFastEdit(reqid)
|
||||||
|
reqids2txids[reqid] = reqid
|
||||||
|
toofastedit.add(reqid)
|
||||||
|
await tractor.pause()
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdError(
|
BrokerdError(
|
||||||
oid=msg['oid'],
|
oid=msg['oid'],
|
||||||
symbol=msg['symbol'],
|
symbol=msg['symbol'],
|
||||||
reason=(
|
reason=(
|
||||||
f'TooFastEdit reqid:{reqid}, cancelling..'
|
f'TooFastEdit reqid: {reqid}, cancelling..'
|
||||||
),
|
),
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
@ -665,6 +675,10 @@ async def open_trade_dialog(
|
||||||
|
|
||||||
token: str = await client.get_ws_token()
|
token: str = await client.get_ws_token()
|
||||||
|
|
||||||
|
# XXX tracks EMS orders which are updated too quickly
|
||||||
|
# on the emds side with sync-issues on the kraken side.
|
||||||
|
toofastedit: set[int] = set()
|
||||||
|
|
||||||
ws: NoBsWs
|
ws: NoBsWs
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
|
|
@ -680,15 +694,16 @@ async def open_trade_dialog(
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
# task for processing inbound requests from ems
|
# task for processing inbound requests from ems
|
||||||
tn.start_soon(
|
tn.start_soon(partial(
|
||||||
handle_order_requests,
|
handle_order_requests,
|
||||||
ws,
|
ws=ws,
|
||||||
client,
|
client=client,
|
||||||
ems_stream,
|
ems_stream=ems_stream,
|
||||||
apiflows,
|
apiflows=apiflows,
|
||||||
ids,
|
ids=ids,
|
||||||
reqids2txids,
|
reqids2txids=reqids2txids,
|
||||||
)
|
toofastedit=toofastedit,
|
||||||
|
))
|
||||||
|
|
||||||
# enter relay loop
|
# enter relay loop
|
||||||
await handle_order_updates(
|
await handle_order_updates(
|
||||||
|
|
@ -699,6 +714,7 @@ async def open_trade_dialog(
|
||||||
apiflows=apiflows,
|
apiflows=apiflows,
|
||||||
ids=ids,
|
ids=ids,
|
||||||
reqids2txids=reqids2txids,
|
reqids2txids=reqids2txids,
|
||||||
|
toofastedit=toofastedit,
|
||||||
acnt=acnt,
|
acnt=acnt,
|
||||||
ledger=ledger,
|
ledger=ledger,
|
||||||
acctid=acctid,
|
acctid=acctid,
|
||||||
|
|
@ -714,6 +730,7 @@ async def handle_order_updates(
|
||||||
apiflows: OrderDialogs,
|
apiflows: OrderDialogs,
|
||||||
ids: bidict[str, int],
|
ids: bidict[str, int],
|
||||||
reqids2txids: bidict[int, str],
|
reqids2txids: bidict[int, str],
|
||||||
|
toofastedit: set[int],
|
||||||
acnt: Account,
|
acnt: Account,
|
||||||
|
|
||||||
# transaction records which will be updated
|
# transaction records which will be updated
|
||||||
|
|
@ -1021,10 +1038,12 @@ async def handle_order_updates(
|
||||||
# <-> ems dialog.
|
# <-> ems dialog.
|
||||||
if (
|
if (
|
||||||
status == 'open'
|
status == 'open'
|
||||||
and isinstance(
|
and
|
||||||
reqids2txids.get(reqid),
|
reqid in toofastedit
|
||||||
TooFastEdit
|
# isinstance(
|
||||||
)
|
# reqids2txids.get(reqid),
|
||||||
|
# TooFastEdit
|
||||||
|
# )
|
||||||
):
|
):
|
||||||
# TODO: don't even allow this case
|
# TODO: don't even allow this case
|
||||||
# by not moving the client side line
|
# by not moving the client side line
|
||||||
|
|
@ -1186,7 +1205,9 @@ async def handle_order_updates(
|
||||||
txid
|
txid
|
||||||
|
|
||||||
# we throttle too-fast-requests on the ems side
|
# we throttle too-fast-requests on the ems side
|
||||||
and not isinstance(txid, TooFastEdit)
|
and
|
||||||
|
reqid in toofastedit
|
||||||
|
# not isinstance(txid, TooFastEdit)
|
||||||
):
|
):
|
||||||
# client was editting too quickly
|
# client was editting too quickly
|
||||||
# so we instead cancel this order
|
# so we instead cancel this order
|
||||||
|
|
|
||||||
|
|
@ -1022,13 +1022,22 @@ async def open_order_mode(
|
||||||
started.set()
|
started.set()
|
||||||
|
|
||||||
for oid, msg in ems_dialog_msgs.items():
|
for oid, msg in ems_dialog_msgs.items():
|
||||||
|
|
||||||
# HACK ALERT: ensure a resp field is filled out since
|
# HACK ALERT: ensure a resp field is filled out since
|
||||||
# techincally the call below expects a ``Status``. TODO:
|
# techincally the call below expects a ``Status``. TODO:
|
||||||
# parse into proper ``Status`` equivalents ems-side?
|
# parse into proper ``Status`` equivalents ems-side?
|
||||||
# msg.setdefault('resp', msg['broker_details']['resp'])
|
# msg.setdefault('resp', msg['broker_details']['resp'])
|
||||||
# msg.setdefault('oid', msg['broker_details']['oid'])
|
# msg.setdefault('oid', msg['broker_details']['oid'])
|
||||||
msg['brokerd_msg'] = msg
|
ya_msg: dict = msg.setdefault(
|
||||||
|
'brokerd_msg',
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
if msg is not ya_msg:
|
||||||
|
log.warning(
|
||||||
|
f'A `.brokerd_msg` was already set for ems-dialog msg?\n'
|
||||||
|
f'oid: {oid!r}\n'
|
||||||
|
f'ya_msg: {ya_msg!r}\n'
|
||||||
|
f'msg: {ya_msg!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
await process_trade_msg(
|
await process_trade_msg(
|
||||||
mode,
|
mode,
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue