Add full real-time position update support B)

There was one trick which was that it seems that binance will often send
the account/position update event over the user stream *before* the
actual clearing (aka FILLED) order update event, so make sure we put an
entry in the `dialogs: OrderDialogs` as soon as an order request comes
in such that even if the account update arrives first the
`BrokerdPosition` msg can be relayed without delay / order event order
considerations.
basic_buy_bot
Tyler Goodlet 2023-06-17 18:00:23 -04:00
parent 4eeb232248
commit 572badb4d8
1 changed files with 69 additions and 34 deletions

View File

@ -174,11 +174,11 @@ async def handle_order_requests(
# TODO: figure out what special params we have to send? # TODO: figure out what special params we have to send?
# https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade
# lookup the binance-native symbol # track latest request state such that map
# bs_mktid: str = client._pairs[order.symbol.upper()].symbol # lookups start at the most recent msg and then
# scan reverse-chronologically.
dialogs.add_msg(oid, msg)
# call our client api to submit the order
try:
# XXX: ACK the request **immediately** before sending # XXX: ACK the request **immediately** before sending
# the api side request to ensure the ems maps the oid -> # the api side request to ensure the ems maps the oid ->
# reqid correctly! # reqid correctly!
@ -189,6 +189,8 @@ async def handle_order_requests(
) )
await ems_order_stream.send(resp) await ems_order_stream.send(resp)
# call our client api to submit the order
try:
reqid = await client.submit_limit( reqid = await client.submit_limit(
symbol=order.symbol, symbol=order.symbol,
side=order.action, side=order.action,
@ -201,11 +203,6 @@ async def handle_order_requests(
# assert reqid == order.oid # assert reqid == order.oid
dids[order.oid] = reqid dids[order.oid] = reqid
# track latest request state such that map
# lookups start at the most recent msg and then
# scan reverse-chronologically.
dialogs.add_msg(oid, msg)
except BrokerError as be: except BrokerError as be:
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
@ -347,9 +344,9 @@ async def open_trade_dialog(
bs_mktid: str = entry['symbol'] bs_mktid: str = entry['symbol']
entry_size: float = float(entry['positionAmt']) entry_size: float = float(entry['positionAmt'])
pair: Pair | None pair: Pair | None = client._venue2pairs[venue].get(bs_mktid)
if ( if (
pair := client._venue2pairs[venue].get(bs_mktid) pair
and entry_size > 0 and entry_size > 0
): ):
entry_price: float = float(entry['entryPrice']) entry_price: float = float(entry['entryPrice'])
@ -406,6 +403,8 @@ async def open_trade_dialog(
) )
tn.start_soon( tn.start_soon(
handle_order_updates, handle_order_updates,
venue,
client,
ems_stream, ems_stream,
wss, wss,
dialogs, dialogs,
@ -416,21 +415,12 @@ async def open_trade_dialog(
async def handle_order_updates( async def handle_order_updates(
venue: str,
client: Client,
ems_stream: tractor.MsgStream, ems_stream: tractor.MsgStream,
wss: NoBsWs, wss: NoBsWs,
dialogs: OrderDialogs, dialogs: OrderDialogs,
# apiflows: dict[int, ChainMap[dict[str, dict]]],
# ids: bidict[str, int],
# reqids2txids: bidict[int, str],
# table: PpTable,
# ledger_trans: dict[str, Transaction],
# acctid: str,
# acc_name: str,
# token: str,
) -> None: ) -> None:
''' '''
Main msg handling loop for all things order management. Main msg handling loop for all things order management.
@ -443,9 +433,6 @@ async def handle_order_updates(
log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}')
match msg: match msg:
# TODO:
# POSITION update
# ORDER update # ORDER update
# spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update # spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
# futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update # futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update
@ -488,7 +475,6 @@ async def handle_order_updates(
'e': 'ORDER_TRADE_UPDATE', 'e': 'ORDER_TRADE_UPDATE',
'T': int(epoch_ms), 'T': int(epoch_ms),
'o': { 'o': {
'i': reqid,
's': bs_mktid, 's': bs_mktid,
# XXX NOTE XXX see special ids for market # XXX NOTE XXX see special ids for market
@ -499,6 +485,7 @@ async def handle_order_updates(
# // "settlement_autoclose-": settlement order # // "settlement_autoclose-": settlement order
# for delisting or delivery # for delisting or delivery
'c': oid, 'c': oid,
# 'i': reqid, # binance internal int id
# prices # prices
'a': submit_price, 'a': submit_price,
@ -579,6 +566,8 @@ async def handle_order_updates(
status = 'canceled' status = 'canceled'
del dialogs._dialogs[oid] del dialogs._dialogs[oid]
# case 'TRADE':
case _: case _:
status = status.lower() status = status.lower()
@ -599,6 +588,52 @@ async def handle_order_updates(
) )
await ems_stream.send(resp) await ems_stream.send(resp)
# ACCOUNT and POSITION update B)
# {
# 'E': 1687036749218,
# 'e': 'ACCOUNT_UPDATE'
# 'T': 1687036749215,
# 'a': {'B': [{'a': 'USDT',
# 'bc': '0',
# 'cw': '1267.48920735',
# 'wb': '1410.90245576'}],
# 'P': [{'cr': '-3292.10973007',
# 'ep': '26349.90000',
# 'iw': '143.41324841',
# 'ma': 'USDT',
# 'mt': 'isolated',
# 'pa': '0.038',
# 'ps': 'BOTH',
# 's': 'BTCUSDT',
# 'up': '5.17555453'}],
# 'm': 'ORDER'},
# }
case {
'T': int(epoch_ms),
'e': 'ACCOUNT_UPDATE',
'a': {
'P': [{
's': bs_mktid,
'pa': pos_amount,
'ep': entry_price,
}],
},
}:
# real-time relay position updates back to EMS
pair: Pair | None = client._venue2pairs[venue].get(bs_mktid)
ppmsg = BrokerdPosition(
broker='binance',
account='binance.usdtm',
# TODO: maybe we should be passing back
# a `MktPair` here?
symbol=pair.bs_fqme.lower() + '.binance',
size=float(pos_amount),
avg_price=float(entry_price),
)
await ems_stream.send(ppmsg)
case _: case _:
log.warning( log.warning(
'Unhandled event:\n' 'Unhandled event:\n'