Fix user event matching
Was using the wrong key before from our old code (not sure how that slipped back in.. prolly doing too many git stashes XD), so fix that to properly match against order update events with 'ORDER_TRADE_UPDATE'. Also, don't match on the types we want to *cast to*, that's not how match syntax works (facepalm), so we have to typecast prior to EMS msg creation / downstream logic. Further, - try not bothering with binance's own internal `'orderId'` field tracking since they seem to support just using your own user version for all ctl endpoints? (thus we only need to track the EMS `.oid`s B) - log all event update msgs for now. - pop order dialogs on 'closed' statuses. - wrap cancel requests in an error handler block since it seems the EMS is double sending requests from the client?basic_buy_bot
parent
09007cbf08
commit
3f555b2f5a
|
@ -88,12 +88,16 @@ class OrderDialogs(Struct):
|
||||||
) -> None:
|
) -> None:
|
||||||
self._dialogs[oid].maps.insert(0, msg)
|
self._dialogs[oid].maps.insert(0, msg)
|
||||||
|
|
||||||
|
# TODO: wrap all this in the `collections.abc.Mapping` interface?
|
||||||
def get(
|
def get(
|
||||||
self,
|
self,
|
||||||
oid: str,
|
oid: str,
|
||||||
field: str,
|
) -> ChainMap[str, Any]:
|
||||||
) -> Any:
|
'''
|
||||||
return self._dialogs[oid][field]
|
Return the dialog `ChainMap` for provided id.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return self._dialogs.get(oid, None)
|
||||||
|
|
||||||
|
|
||||||
async def handle_order_requests(
|
async def handle_order_requests(
|
||||||
|
@ -134,10 +138,23 @@ async def handle_order_requests(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
else:
|
else:
|
||||||
await client.submit_cancel(
|
symbol: str = existing['symbol']
|
||||||
existing.symbol,
|
try:
|
||||||
cancel.oid,
|
await client.submit_cancel(
|
||||||
)
|
symbol,
|
||||||
|
cancel.oid,
|
||||||
|
)
|
||||||
|
except BrokerError as be:
|
||||||
|
await ems_order_stream.send(
|
||||||
|
BrokerdError(
|
||||||
|
oid=msg['oid'],
|
||||||
|
symbol=symbol,
|
||||||
|
reason=(
|
||||||
|
'`binance` CANCEL failed:\n'
|
||||||
|
f'{be}'
|
||||||
|
))
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'account': ('binance.usdtm' | 'binance.spot') as account,
|
'account': ('binance.usdtm' | 'binance.spot') as account,
|
||||||
|
@ -151,35 +168,35 @@ async def handle_order_requests(
|
||||||
# NOTE: check and report edits
|
# NOTE: check and report edits
|
||||||
if existing := dialogs.get(order.oid):
|
if existing := dialogs.get(order.oid):
|
||||||
log.info(
|
log.info(
|
||||||
f'Existing order for {existing.oid} updated:\n'
|
f'Existing order for {oid} updated:\n'
|
||||||
f'{pformat(existing.to_dict())} -> {pformat(msg)}'
|
f'{pformat(existing.maps[-1])} -> {pformat(msg)}'
|
||||||
)
|
)
|
||||||
# TODO: figure out what special params we have to send?
|
# TODO: figure out what special params we have to send?
|
||||||
# https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade
|
# https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade
|
||||||
|
|
||||||
# lookup the binance-native symbol
|
# lookup the binance-native symbol
|
||||||
bs_mktid: str = client._pairs[order.symbol.upper()].symbol
|
# bs_mktid: str = client._pairs[order.symbol.upper()].symbol
|
||||||
|
|
||||||
# call our client api to submit the order
|
# call our client api to submit the order
|
||||||
try:
|
try:
|
||||||
reqid = await client.submit_limit(
|
|
||||||
symbol=bs_mktid,
|
|
||||||
side=order.action,
|
|
||||||
quantity=order.size,
|
|
||||||
price=order.price,
|
|
||||||
oid=oid,
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: ACK the request **immediately** before sending
|
# XXX: ACK the request **immediately** before sending
|
||||||
# the api side request to ensure the ems maps the oid ->
|
# the api side request to ensure the ems maps the oid ->
|
||||||
# reqid correctly!
|
# reqid correctly!
|
||||||
resp = BrokerdOrderAck(
|
resp = BrokerdOrderAck(
|
||||||
oid=oid, # ems order request id
|
oid=oid, # ems order request id
|
||||||
reqid=reqid, # our custom int mapping
|
reqid=oid, # our custom int mapping
|
||||||
account='binance', # piker account
|
account='binance', # piker account
|
||||||
)
|
)
|
||||||
await ems_order_stream.send(resp)
|
await ems_order_stream.send(resp)
|
||||||
|
|
||||||
|
reqid = await client.submit_limit(
|
||||||
|
symbol=order.symbol,
|
||||||
|
side=order.action,
|
||||||
|
quantity=order.size,
|
||||||
|
price=order.price,
|
||||||
|
oid=oid,
|
||||||
|
)
|
||||||
|
|
||||||
# SMH they do gen their own order id: ints..
|
# SMH they do gen their own order id: ints..
|
||||||
# assert reqid == order.oid
|
# assert reqid == order.oid
|
||||||
dids[order.oid] = reqid
|
dids[order.oid] = reqid
|
||||||
|
@ -187,7 +204,7 @@ async def handle_order_requests(
|
||||||
# track latest request state such that map
|
# track latest request state such that map
|
||||||
# lookups start at the most recent msg and then
|
# lookups start at the most recent msg and then
|
||||||
# scan reverse-chronologically.
|
# scan reverse-chronologically.
|
||||||
dialogs.add_msg(msg)
|
dialogs.add_msg(oid, msg)
|
||||||
|
|
||||||
except BrokerError as be:
|
except BrokerError as be:
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
|
@ -235,7 +252,7 @@ async def open_trade_dialog(
|
||||||
client.mkt_mode: str = 'usdtm_futes'
|
client.mkt_mode: str = 'usdtm_futes'
|
||||||
|
|
||||||
# if client.
|
# if client.
|
||||||
account: str = client.mkt_mode
|
venue: str = client.mkt_mode
|
||||||
|
|
||||||
wss: NoBsWs
|
wss: NoBsWs
|
||||||
async with (
|
async with (
|
||||||
|
@ -332,14 +349,14 @@ async def open_trade_dialog(
|
||||||
|
|
||||||
pair: Pair | None
|
pair: Pair | None
|
||||||
if (
|
if (
|
||||||
pair := client._venue2pairs[account].get(bs_mktid)
|
pair := client._venue2pairs[venue].get(bs_mktid)
|
||||||
and entry_size > 0
|
and entry_size > 0
|
||||||
):
|
):
|
||||||
entry_price: float = float(entry['entryPrice'])
|
entry_price: float = float(entry['entryPrice'])
|
||||||
|
|
||||||
ppmsg = BrokerdPosition(
|
ppmsg = BrokerdPosition(
|
||||||
broker='binance',
|
broker='binance',
|
||||||
account='binance.futes',
|
account='binance.usdtm',
|
||||||
|
|
||||||
# TODO: maybe we should be passing back
|
# TODO: maybe we should be passing back
|
||||||
# a `MktPair` here?
|
# a `MktPair` here?
|
||||||
|
@ -365,6 +382,9 @@ async def open_trade_dialog(
|
||||||
# .accounting support B)
|
# .accounting support B)
|
||||||
# - live order loading via user stream subscription and
|
# - live order loading via user stream subscription and
|
||||||
# update to the order dialog table.
|
# update to the order dialog table.
|
||||||
|
# - MAKE SURE we add live orders loaded during init
|
||||||
|
# into the dialogs table to ensure they can be
|
||||||
|
# cancelled, meaning we can do a symbol lookup.
|
||||||
# - position loading using `piker.accounting` subsys
|
# - position loading using `piker.accounting` subsys
|
||||||
# and comparison with binance's own position calcs.
|
# and comparison with binance's own position calcs.
|
||||||
# - load pps and accounts using accounting apis, write
|
# - load pps and accounts using accounting apis, write
|
||||||
|
@ -388,6 +408,7 @@ async def open_trade_dialog(
|
||||||
handle_order_updates,
|
handle_order_updates,
|
||||||
ems_stream,
|
ems_stream,
|
||||||
wss,
|
wss,
|
||||||
|
dialogs,
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -397,6 +418,7 @@ async def open_trade_dialog(
|
||||||
async def handle_order_updates(
|
async def handle_order_updates(
|
||||||
ems_stream: tractor.MsgStream,
|
ems_stream: tractor.MsgStream,
|
||||||
wss: NoBsWs,
|
wss: NoBsWs,
|
||||||
|
dialogs: OrderDialogs,
|
||||||
|
|
||||||
# apiflows: dict[int, ChainMap[dict[str, dict]]],
|
# apiflows: dict[int, ChainMap[dict[str, dict]]],
|
||||||
# ids: bidict[str, int],
|
# ids: bidict[str, int],
|
||||||
|
@ -418,20 +440,55 @@ async def handle_order_updates(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async for msg in wss:
|
async for msg in wss:
|
||||||
|
log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}')
|
||||||
match msg:
|
match msg:
|
||||||
log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}')
|
|
||||||
|
|
||||||
# TODO:
|
# TODO:
|
||||||
# POSITION update
|
# POSITION update
|
||||||
# futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update
|
|
||||||
|
|
||||||
# ORDER update
|
# ORDER update
|
||||||
# spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
|
# spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
|
||||||
# futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update
|
# futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update
|
||||||
|
# futes: https://binance-docs.github.io/apidocs/futures/en/#event-balance-and-position-update
|
||||||
|
# {'o': {
|
||||||
|
# 'L': '0',
|
||||||
|
# 'N': 'USDT',
|
||||||
|
# 'R': False,
|
||||||
|
# 'S': 'BUY',
|
||||||
|
# 'T': 1687028772484,
|
||||||
|
# 'X': 'NEW',
|
||||||
|
# 'a': '0',
|
||||||
|
# 'ap': '0',
|
||||||
|
# 'b': '7012.06520',
|
||||||
|
# 'c': '518d4122-8d3e-49b0-9a1e-1fabe6f62e4c',
|
||||||
|
# 'cp': False,
|
||||||
|
# 'f': 'GTC',
|
||||||
|
# 'i': 3376956924,
|
||||||
|
# 'l': '0',
|
||||||
|
# 'm': False,
|
||||||
|
# 'n': '0',
|
||||||
|
# 'o': 'LIMIT',
|
||||||
|
# 'ot': 'LIMIT',
|
||||||
|
# 'p': '21136.80',
|
||||||
|
# 'pP': False,
|
||||||
|
# 'ps': 'BOTH',
|
||||||
|
# 'q': '0.047',
|
||||||
|
# 'rp': '0',
|
||||||
|
# 's': 'BTCUSDT',
|
||||||
|
# 'si': 0,
|
||||||
|
# 'sp': '0',
|
||||||
|
# 'ss': 0,
|
||||||
|
# 't': 0,
|
||||||
|
# 'wt': 'CONTRACT_PRICE',
|
||||||
|
# 'x': 'NEW',
|
||||||
|
# 'z': '0'}
|
||||||
|
# }
|
||||||
case {
|
case {
|
||||||
'e': 'executionReport',
|
# 'e': 'executionReport',
|
||||||
'T': float(epoch_ms),
|
'e': 'ORDER_TRADE_UPDATE',
|
||||||
|
'T': int(epoch_ms),
|
||||||
'o': {
|
'o': {
|
||||||
|
'i': reqid,
|
||||||
's': bs_mktid,
|
's': bs_mktid,
|
||||||
|
|
||||||
# XXX NOTE XXX see special ids for market
|
# XXX NOTE XXX see special ids for market
|
||||||
|
@ -444,22 +501,22 @@ async def handle_order_updates(
|
||||||
'c': oid,
|
'c': oid,
|
||||||
|
|
||||||
# prices
|
# prices
|
||||||
'a': float(submit_price),
|
'a': submit_price,
|
||||||
'ap': float(avg_price),
|
'ap': avg_price,
|
||||||
'L': float(fill_price),
|
'L': fill_price,
|
||||||
|
|
||||||
# sizing
|
# sizing
|
||||||
'q': float(req_size),
|
'q': req_size,
|
||||||
'l': float(clear_size_filled), # this event
|
'l': clear_size_filled, # this event
|
||||||
'z': float(accum_size_filled), # accum
|
'z': accum_size_filled, # accum
|
||||||
|
|
||||||
# commissions
|
# commissions
|
||||||
'n': float(cost),
|
'n': cost,
|
||||||
'N': str(cost_asset),
|
'N': cost_asset,
|
||||||
|
|
||||||
# state
|
# state
|
||||||
'S': str(side),
|
'S': side,
|
||||||
'X': str(status),
|
'X': status,
|
||||||
},
|
},
|
||||||
} as order_msg:
|
} as order_msg:
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -485,12 +542,18 @@ async def handle_order_updates(
|
||||||
# - CANCELED
|
# - CANCELED
|
||||||
# - EXPIRED
|
# - EXPIRED
|
||||||
# https://binance-docs.github.io/apidocs/futures/en/#event-order-update
|
# https://binance-docs.github.io/apidocs/futures/en/#event-order-update
|
||||||
|
|
||||||
|
req_size: float = float(req_size)
|
||||||
|
accum_size_filled: float = float(accum_size_filled)
|
||||||
|
fill_price: float = float(fill_price)
|
||||||
|
|
||||||
match status:
|
match status:
|
||||||
case 'PARTIALLY_FILLED' | 'FILLED':
|
case 'PARTIALLY_FILLED' | 'FILLED':
|
||||||
status = 'fill'
|
status = 'fill'
|
||||||
|
|
||||||
fill_msg = BrokerdFill(
|
fill_msg = BrokerdFill(
|
||||||
time_ns=time_ns(),
|
time_ns=time_ns(),
|
||||||
|
# reqid=reqid,
|
||||||
reqid=oid,
|
reqid=oid,
|
||||||
|
|
||||||
# just use size value for now?
|
# just use size value for now?
|
||||||
|
@ -507,21 +570,26 @@ async def handle_order_updates(
|
||||||
|
|
||||||
if accum_size_filled == req_size:
|
if accum_size_filled == req_size:
|
||||||
status = 'closed'
|
status = 'closed'
|
||||||
|
del dialogs._dialogs[oid]
|
||||||
|
|
||||||
case 'NEW':
|
case 'NEW':
|
||||||
status = 'open'
|
status = 'open'
|
||||||
|
|
||||||
case 'EXPIRED':
|
case 'EXPIRED':
|
||||||
status = 'canceled'
|
status = 'canceled'
|
||||||
|
del dialogs._dialogs[oid]
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
status = status.lower()
|
status = status.lower()
|
||||||
|
|
||||||
resp = BrokerdStatus(
|
resp = BrokerdStatus(
|
||||||
time_ns=time_ns(),
|
time_ns=time_ns(),
|
||||||
|
# reqid=reqid,
|
||||||
reqid=oid,
|
reqid=oid,
|
||||||
|
# account='binance.usdtm',
|
||||||
|
|
||||||
status=status,
|
status=status,
|
||||||
|
|
||||||
filled=accum_size_filled,
|
filled=accum_size_filled,
|
||||||
remaining=req_size - accum_size_filled,
|
remaining=req_size - accum_size_filled,
|
||||||
broker_details={
|
broker_details={
|
||||||
|
@ -530,3 +598,9 @@ async def handle_order_updates(
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
await ems_stream.send(resp)
|
await ems_stream.send(resp)
|
||||||
|
|
||||||
|
case _:
|
||||||
|
log.warning(
|
||||||
|
'Unhandled event:\n'
|
||||||
|
f'{pformat(msg)}'
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in New Issue