Wrap dialog tracking in new `OrderDialogs` type, info log all user stream msgs

basic_buy_bot
Tyler Goodlet 2023-06-17 14:45:45 -04:00
parent 45ded4f2d1
commit 8a06e4d073
1 changed files with 75 additions and 23 deletions

View File

@ -22,7 +22,10 @@ Live order control B)
''' '''
from __future__ import annotations from __future__ import annotations
from collections import ChainMap from collections import (
ChainMap,
defaultdict,
)
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
@ -42,6 +45,7 @@ from piker.accounting import (
from piker.brokers._util import ( from piker.brokers._util import (
get_logger, get_logger,
) )
from piker.data.types import Struct
from piker.data._web_bs import ( from piker.data._web_bs import (
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
@ -65,12 +69,38 @@ from .api import Client
log = get_logger('piker.brokers.binance') log = get_logger('piker.brokers.binance')
class OrderDialogs(Struct):
'''
Order control dialog (and thus transaction) tracking via
message recording.
Allows easily recording messages associated with a given set of
order control transactions and looking up the latest field
state using the entire (reverse chronological) msg flow.
'''
_dialogs: defaultdict[str, ChainMap] = defaultdict(ChainMap)
def add_msg(
self,
oid: str,
msg: dict,
) -> None:
self._dialogs[oid].maps.insert(0, msg)
def get(
self,
oid: str,
field: str,
) -> Any:
return self._dialogs[oid][field]
async def handle_order_requests( async def handle_order_requests(
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
client: Client, client: Client,
dids: bidict[str, str],
# TODO: update this from open orders loaded at boot! dialogs: OrderDialogs,
dialogs: ChainMap[str, BrokerdOrder] = ChainMap(),
) -> None: ) -> None:
''' '''
@ -92,8 +122,11 @@ async def handle_order_requests(
) )
await ems_order_stream.send(BrokerdError( await ems_order_stream.send(BrokerdError(
oid=cancel.oid, oid=cancel.oid,
# TODO: do we need the symbol? # TODO: do we need the symbol?
# https://github.com/pikers/piker/issues/514
symbol='unknown', symbol='unknown',
reason=( reason=(
'Invalid `binance` order request dialog oid', 'Invalid `binance` order request dialog oid',
) )
@ -113,6 +146,7 @@ async def handle_order_requests(
# validate # validate
order = BrokerdOrder(**msg) order = BrokerdOrder(**msg)
oid: str = order.oid # emsd order id
# NOTE: check and report edits # NOTE: check and report edits
if existing := dialogs.get(order.oid): if existing := dialogs.get(order.oid):
@ -123,16 +157,6 @@ async def handle_order_requests(
# TODO: figure out what special params we have to send? # TODO: figure out what special params we have to send?
# https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade
# XXX: ACK the request **immediately** before sending
# the api side request to ensure the ems maps the oid ->
# reqid correctly!
resp = BrokerdOrderAck(
oid=order.oid, # ems order request id
reqid=order.oid, # our custom int mapping
account='binance', # piker account
)
await ems_order_stream.send(resp)
# lookup the binance-native symbol # lookup the binance-native symbol
bs_mktid: str = client._pairs[order.symbol.upper()].symbol bs_mktid: str = client._pairs[order.symbol.upper()].symbol
@ -143,13 +167,28 @@ async def handle_order_requests(
side=order.action, side=order.action,
quantity=order.size, quantity=order.size,
price=order.price, price=order.price,
oid=order.oid oid=oid,
) )
# thank god at least someone lets us do this XD
assert reqid == order.oid
# track latest request state # XXX: ACK the request **immediately** before sending
dialogs[reqid].maps.append(msg) # the api side request to ensure the ems maps the oid ->
# reqid correctly!
resp = BrokerdOrderAck(
oid=oid, # ems order request id
reqid=reqid, # our custom int mapping
account='binance', # piker account
)
await ems_order_stream.send(resp)
# SMH they do gen their own order id: ints..
# assert reqid == order.oid
dids[order.oid] = reqid
# track latest request state such that map
# lookups start at the most recent msg and then
# scan reverse-chronologically.
dialogs.add_msg(msg)
except BrokerError as be: except BrokerError as be:
await ems_order_stream.send( await ems_order_stream.send(
BrokerdError( BrokerdError(
@ -190,10 +229,6 @@ async def open_trade_dialog(
await ctx.started('paper') await ctx.started('paper')
return return
# table: PpTable
# ledger: TransactionLedger
# TODO: load pps and accounts using accounting apis!
async with ( async with (
open_cached_client('binance') as client, open_cached_client('binance') as client,
): ):
@ -323,6 +358,20 @@ async def open_trade_dialog(
await ctx.started((positions, list(accounts))) await ctx.started((positions, list(accounts)))
dialogs = OrderDialogs()
dids: dict[str, int] = bidict()
# TODO: further init setup things to get full EMS and
# .accounting support B)
# - live order loading via user stream subscription and
# update to the order dialog table.
# - position loading using `piker.accounting` subsys
# and comparison with binance's own position calcs.
# - load pps and accounts using accounting apis, write
# the ledger and account files
# - table: PpTable
# - ledger: TransactionLedger
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
ctx.open_stream() as ems_stream, ctx.open_stream() as ems_stream,
@ -332,6 +381,8 @@ async def open_trade_dialog(
handle_order_requests, handle_order_requests,
ems_stream, ems_stream,
client, client,
dids,
dialogs,
) )
tn.start_soon( tn.start_soon(
handle_order_updates, handle_order_updates,
@ -368,6 +419,7 @@ async def handle_order_updates(
''' '''
async for msg in wss: async for msg in wss:
match msg: match msg:
log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}')
# TODO: # TODO:
# POSITION update # POSITION update