From 7f39de59d4e427db21c99152071456139f3acbcf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Jun 2023 11:44:28 -0400 Subject: [PATCH] Factor `OrderDialogs` into `.clearing._util` It's finally a decent little design / interface and definitely can be used in other backends like `kraken` which rolled something lower level but more or less the same without a wrapper class. --- piker/brokers/binance/broker.py | 65 +-------------------------------- piker/clearing/__init__.py | 2 + piker/clearing/_util.py | 63 ++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 64 deletions(-) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index dfc8373c..6011aa9d 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -22,10 +22,6 @@ Live order control B) ''' from __future__ import annotations -from collections import ( - ChainMap, - defaultdict, -) from pprint import pformat from typing import ( Any, @@ -45,7 +41,6 @@ from piker.accounting import ( from piker.brokers._util import ( get_logger, ) -from piker.data.types import Struct from piker.data._web_bs import ( open_autorecon_ws, NoBsWs, @@ -54,6 +49,7 @@ from piker.brokers import ( open_cached_client, BrokerError, ) +from piker.clearing import OrderDialogs from piker.clearing._messages import ( BrokerdOrder, BrokerdOrderAck, @@ -71,65 +67,6 @@ from .api import Client log = get_logger('piker.brokers.binance') -# TODO: factor this into `.clearing._util` (or something) -# and use in other backends like kraken which currently has -# a less formalized version more or less: -# `apiflows[reqid].maps.append(status_msg.to_dict())` -class OrderDialogs(Struct): - ''' - Order control dialog (and thus transaction) tracking via - message recording. - - Allows easily recording messages associated with a given set of - order control transactions and looking up the latest field - state using the entire (reverse chronological) msg flow. - - ''' - _flows: dict[str, ChainMap] = {} - - def add_msg( - self, - oid: str, - msg: dict, - ) -> None: - - # NOTE: manually enter a new map on the first msg add to - # avoid creating one with an empty dict first entry in - # `ChainMap.maps` which is the default if none passed at - # init. - cm: ChainMap = self._flows.get(oid) - if cm: - cm.maps.insert(0, msg) - else: - cm = ChainMap(msg) - self._flows[oid] = cm - - # TODO: wrap all this in the `collections.abc.Mapping` interface? - def get( - self, - oid: str, - - ) -> ChainMap[str, Any]: - ''' - Return the dialog `ChainMap` for provided id. - - ''' - return self._flows.get(oid, None) - - def pop( - self, - oid: str, - - ) -> ChainMap[str, Any]: - ''' - Pop and thus remove the `ChainMap` containing the msg flow - for the given order id. - - ''' - return self._flows.pop(oid) - - - async def handle_order_requests( ems_order_stream: tractor.MsgStream, client: Client, diff --git a/piker/clearing/__init__.py b/piker/clearing/__init__.py index ec796ac9..19d6390f 100644 --- a/piker/clearing/__init__.py +++ b/piker/clearing/__init__.py @@ -26,12 +26,14 @@ from ._client import ( from ._ems import ( open_brokerd_dialog, ) +from ._util import OrderDialogs __all__ = [ 'open_ems', 'OrderClient', 'open_brokerd_dialog', + 'OrderDialogs', ] diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index ec93512d..9015ba69 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -17,12 +17,15 @@ Sub-sys module commons. """ +from collections import ChainMap from functools import partial +from typing import Any from ..log import ( get_logger, get_console_log, ) +from piker.data.types import Struct subsys: str = 'piker.clearing' log = get_logger(subsys) @@ -31,3 +34,63 @@ get_console_log = partial( get_console_log, name=subsys, ) + + +# TODO: use this in other backends like kraken which currently has +# a less formalized version more or less: +# `apiflows[reqid].maps.append(status_msg.to_dict())` +class OrderDialogs(Struct): + ''' + Order control dialog (and thus transaction) tracking via + message recording. + + Allows easily recording messages associated with a given set of + order control transactions and looking up the latest field + state using the entire (reverse chronological) msg flow. + + ''' + _flows: dict[str, ChainMap] = {} + + def add_msg( + self, + oid: str, + msg: dict, + ) -> None: + + # NOTE: manually enter a new map on the first msg add to + # avoid creating one with an empty dict first entry in + # `ChainMap.maps` which is the default if none passed at + # init. + cm: ChainMap = self._flows.get(oid) + if cm: + cm.maps.insert(0, msg) + else: + cm = ChainMap(msg) + self._flows[oid] = cm + + # TODO: wrap all this in the `collections.abc.Mapping` interface? + def get( + self, + oid: str, + + ) -> ChainMap[str, Any]: + ''' + Return the dialog `ChainMap` for provided id. + + ''' + return self._flows.get(oid, None) + + def pop( + self, + oid: str, + + ) -> ChainMap[str, Any]: + ''' + Pop and thus remove the `ChainMap` containing the msg flow + for the given order id. + + ''' + return self._flows.pop(oid) + + +