From 8a06e4d0732062c5b265108d5d568cb0f337f0b8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 17 Jun 2023 14:45:45 -0400 Subject: [PATCH] Wrap dialog tracking in new `OrderDialogs` type, info log all user stream msgs --- piker/brokers/binance/broker.py | 98 +++++++++++++++++++++++++-------- 1 file changed, 75 insertions(+), 23 deletions(-) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index 2eff326f..1926fdba 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -22,7 +22,10 @@ Live order control B) ''' from __future__ import annotations -from collections import ChainMap +from collections import ( + ChainMap, + defaultdict, +) from pprint import pformat from typing import ( Any, @@ -42,6 +45,7 @@ from piker.accounting import ( from piker.brokers._util import ( get_logger, ) +from piker.data.types import Struct from piker.data._web_bs import ( open_autorecon_ws, NoBsWs, @@ -65,12 +69,38 @@ from .api import Client log = get_logger('piker.brokers.binance') +class OrderDialogs(Struct): + ''' + Order control dialog (and thus transaction) tracking via + message recording. + + Allows easily recording messages associated with a given set of + order control transactions and looking up the latest field + state using the entire (reverse chronological) msg flow. + + ''' + _dialogs: defaultdict[str, ChainMap] = defaultdict(ChainMap) + + def add_msg( + self, + oid: str, + msg: dict, + ) -> None: + self._dialogs[oid].maps.insert(0, msg) + + def get( + self, + oid: str, + field: str, + ) -> Any: + return self._dialogs[oid][field] + + async def handle_order_requests( ems_order_stream: tractor.MsgStream, client: Client, - - # TODO: update this from open orders loaded at boot! - dialogs: ChainMap[str, BrokerdOrder] = ChainMap(), + dids: bidict[str, str], + dialogs: OrderDialogs, ) -> None: ''' @@ -92,8 +122,11 @@ async def handle_order_requests( ) await ems_order_stream.send(BrokerdError( oid=cancel.oid, + # TODO: do we need the symbol? + # https://github.com/pikers/piker/issues/514 symbol='unknown', + reason=( 'Invalid `binance` order request dialog oid', ) @@ -113,6 +146,7 @@ async def handle_order_requests( # validate order = BrokerdOrder(**msg) + oid: str = order.oid # emsd order id # NOTE: check and report edits if existing := dialogs.get(order.oid): @@ -123,16 +157,6 @@ async def handle_order_requests( # TODO: figure out what special params we have to send? # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade - # XXX: ACK the request **immediately** before sending - # the api side request to ensure the ems maps the oid -> - # reqid correctly! - resp = BrokerdOrderAck( - oid=order.oid, # ems order request id - reqid=order.oid, # our custom int mapping - account='binance', # piker account - ) - await ems_order_stream.send(resp) - # lookup the binance-native symbol bs_mktid: str = client._pairs[order.symbol.upper()].symbol @@ -143,13 +167,28 @@ async def handle_order_requests( side=order.action, quantity=order.size, price=order.price, - oid=order.oid + oid=oid, ) - # thank god at least someone lets us do this XD - assert reqid == order.oid - # track latest request state - dialogs[reqid].maps.append(msg) + # XXX: ACK the request **immediately** before sending + # the api side request to ensure the ems maps the oid -> + # reqid correctly! + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=reqid, # our custom int mapping + account='binance', # piker account + ) + await ems_order_stream.send(resp) + + # SMH they do gen their own order id: ints.. + # assert reqid == order.oid + dids[order.oid] = reqid + + # track latest request state such that map + # lookups start at the most recent msg and then + # scan reverse-chronologically. + dialogs.add_msg(msg) + except BrokerError as be: await ems_order_stream.send( BrokerdError( @@ -190,10 +229,6 @@ async def open_trade_dialog( await ctx.started('paper') return - # table: PpTable - # ledger: TransactionLedger - - # TODO: load pps and accounts using accounting apis! async with ( open_cached_client('binance') as client, ): @@ -323,6 +358,20 @@ async def open_trade_dialog( await ctx.started((positions, list(accounts))) + dialogs = OrderDialogs() + dids: dict[str, int] = bidict() + + # TODO: further init setup things to get full EMS and + # .accounting support B) + # - live order loading via user stream subscription and + # update to the order dialog table. + # - position loading using `piker.accounting` subsys + # and comparison with binance's own position calcs. + # - load pps and accounts using accounting apis, write + # the ledger and account files + # - table: PpTable + # - ledger: TransactionLedger + async with ( trio.open_nursery() as tn, ctx.open_stream() as ems_stream, @@ -332,6 +381,8 @@ async def open_trade_dialog( handle_order_requests, ems_stream, client, + dids, + dialogs, ) tn.start_soon( handle_order_updates, @@ -368,6 +419,7 @@ async def handle_order_updates( ''' async for msg in wss: match msg: + log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') # TODO: # POSITION update