Support live order loading (with caveats)

As you'd expect query and sync the EMS with existing live orders
reported by the market venue by packing them in `Status` msgs and
sending over the order dialog stream before starting the handler tasks.

XXX CAVEAT:
- there appears to be no way (at least on the usdtm market/venue) to
  distinguish between different contracts such as perps vs. the
  quarterlies?
- for now we just assume that the perp is being used since
  there's no indicator otherwise in the 'symbol' field?
- we should maybe open an issue with the futures-connector project to
  see how they'd recommend solving this discrepancy?
basic_buy_bot
Tyler Goodlet 2023-06-19 11:04:38 -04:00
parent dc3ac8de01
commit 5c315ba163
2 changed files with 123 additions and 11 deletions

View File

@ -49,6 +49,9 @@ from fuzzywuzzy import process as fuzzy
import numpy as np
from piker import config
from piker.clearing._messages import (
Order,
)
from piker.accounting import (
Asset,
digits_to_dec,
@ -378,9 +381,6 @@ class Client:
raise SymbolNotFound(f'No market pairs found!?:\n{resp}')
pairs_view_subtable: dict[str, Pair] = {}
# if venue == 'spot':
# import tractor
# await tractor.breakpoint()
for item in mkt_pairs:
filters_ls: list = item.pop('filters', False)
@ -619,6 +619,68 @@ class Client:
signed=True,
)
async def get_open_orders(
self,
symbol: str | None = None,
) -> list[Order]:
'''
Get all open orders for venue-account.
WARNING: apparently not specifying the symbol is given
a much heavier API "weight" meaning you shouldn't call it
often to avoid getting throttled as per:
'https://binance-docs.github.io/apidocs/futures/en/#current-all-open-orders-user_data
'''
params: dict[str, Any] = {
'timestamp': binance_timestamp(now()),
}
if symbol is not None:
params['symbol'] = symbol
resp = await self.mkt_mode_req[self.mkt_mode](
'openOrders',
params=params,
signed=True,
action='get',
)
orders: list[Order] = []
for entry in resp:
oid: str = entry['clientOrderId']
# XXX TODO XXX: it appears as though entries have no
# indicator from the symbology system which market
# / venue the order is from.. which normally isn't
# a huge deal since you could assume based on the
# endpoint you made the request to, BUT the futes USD-M
# endpoints have multiple contracts for the same
# symbols (eg. BTCUSDT.PERP, BTCUSDT.230630.. etc.)
# NOTE: for now until we have a better system we're
# going to assume orders that don't have some kind of
# further info in the order resp dict are perps though
# likely this will need to change in the future..
venue: str = self.mkt_mode.rstrip('_futes')
bs_mktid: str = entry['symbol']
fqme: str = f'{bs_mktid.lower()}.{venue}.perp'
orders.append(
Order(
oid=oid,
symbol=fqme,
action=entry['side'].lower(),
price=float(entry['price']),
size=float(entry['origQty']),
exec_mode='live',
account=f'binance.{venue}',
)
)
return orders
async def submit_limit(
self,
symbol: str,

View File

@ -62,6 +62,8 @@ from piker.clearing._messages import (
BrokerdFill,
BrokerdCancel,
BrokerdError,
Status,
Order,
)
from .venues import Pair
from .api import Client
@ -69,6 +71,10 @@ from .api import Client
log = get_logger('piker.brokers.binance')
# TODO: factor this into `.clearing._util` (or something)
# and use in other backends like kraken which currently has
# a less formalized version more or less:
# `apiflows[reqid].maps.append(status_msg.to_dict())`
class OrderDialogs(Struct):
'''
Order control dialog (and thus transaction) tracking via
@ -79,25 +85,49 @@ class OrderDialogs(Struct):
state using the entire (reverse chronological) msg flow.
'''
_dialogs: defaultdict[str, ChainMap] = defaultdict(ChainMap)
_flows: dict[str, ChainMap] = {}
def add_msg(
self,
oid: str,
msg: dict,
) -> None:
self._dialogs[oid].maps.insert(0, msg)
# NOTE: manually enter a new map on the first msg add to
# avoid creating one with an empty dict first entry in
# `ChainMap.maps` which is the default if none passed at
# init.
cm: ChainMap = self._flows.get(oid)
if cm:
cm.maps.insert(0, msg)
else:
cm = ChainMap(msg)
self._flows[oid] = cm
# TODO: wrap all this in the `collections.abc.Mapping` interface?
def get(
self,
oid: str,
) -> ChainMap[str, Any]:
'''
Return the dialog `ChainMap` for provided id.
'''
return self._dialogs.get(oid, None)
return self._flows.get(oid, None)
def pop(
self,
oid: str,
) -> ChainMap[str, Any]:
'''
Pop and thus remove the `ChainMap` containing the msg flow
for the given order id.
'''
return self._flows.pop(oid)
async def handle_order_requests(
@ -277,11 +307,15 @@ async def open_trade_dialog(
f"{listen_key}@account",
f"{listen_key}@balance",
f"{listen_key}@position",
# TODO: does this even work!? seems to cause
# a hang on the first msg..? lelelel.
# f"{listen_key}@order",
],
"id": nsid
})
with trio.fail_after(1):
with trio.fail_after(6):
msg = await wss.recv_msg()
assert msg['id'] == nsid
@ -401,6 +435,24 @@ async def open_trade_dialog(
trio.open_nursery() as tn,
ctx.open_stream() as ems_stream,
):
# deliver all pre-exist open orders to EMS thus syncing
# state with the binance existing live limit set.
open_orders: list[Order] = await client.get_open_orders()
# fill out `Status` with boxed `Order`s and sync the EMS.
for order in open_orders:
status_msg = Status(
time_ns=time.time_ns(),
resp='open',
oid=order.oid,
reqid=order.oid,
# embedded order info
req=order,
src='binance',
)
dialogs.add_msg(order.oid, order.to_dict())
await ems_stream.send(status_msg)
tn.start_soon(
handle_order_requests,
@ -565,16 +617,14 @@ async def handle_order_updates(
if accum_size_filled == req_size:
status = 'closed'
del dialogs._dialogs[oid]
dialogs.pop(oid)
case 'NEW':
status = 'open'
case 'EXPIRED':
status = 'canceled'
del dialogs._dialogs[oid]
# case 'TRADE':
dialogs.pop(oid)
case _:
status = status.lower()