Factor `OrderDialogs` into `.clearing._util`

It's finally a decent little design / interface and definitely can be
used in other backends like `kraken` which rolled something lower level
but more or less the same without a wrapper class.
basic_buy_bot
Tyler Goodlet 2023-06-19 11:44:28 -04:00
parent 5c315ba163
commit 7f39de59d4
3 changed files with 66 additions and 64 deletions

View File

@ -22,10 +22,6 @@ Live order control B)
''' '''
from __future__ import annotations from __future__ import annotations
from collections import (
ChainMap,
defaultdict,
)
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, Any,
@ -45,7 +41,6 @@ from piker.accounting import (
from piker.brokers._util import ( from piker.brokers._util import (
get_logger, get_logger,
) )
from piker.data.types import Struct
from piker.data._web_bs import ( from piker.data._web_bs import (
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
@ -54,6 +49,7 @@ from piker.brokers import (
open_cached_client, open_cached_client,
BrokerError, BrokerError,
) )
from piker.clearing import OrderDialogs
from piker.clearing._messages import ( from piker.clearing._messages import (
BrokerdOrder, BrokerdOrder,
BrokerdOrderAck, BrokerdOrderAck,
@ -71,65 +67,6 @@ from .api import Client
log = get_logger('piker.brokers.binance') log = get_logger('piker.brokers.binance')
# TODO: factor this into `.clearing._util` (or something)
# and use in other backends like kraken which currently has
# a less formalized version more or less:
# `apiflows[reqid].maps.append(status_msg.to_dict())`
class OrderDialogs(Struct):
'''
Order control dialog (and thus transaction) tracking via
message recording.
Allows easily recording messages associated with a given set of
order control transactions and looking up the latest field
state using the entire (reverse chronological) msg flow.
'''
_flows: dict[str, ChainMap] = {}
def add_msg(
self,
oid: str,
msg: dict,
) -> None:
# NOTE: manually enter a new map on the first msg add to
# avoid creating one with an empty dict first entry in
# `ChainMap.maps` which is the default if none passed at
# init.
cm: ChainMap = self._flows.get(oid)
if cm:
cm.maps.insert(0, msg)
else:
cm = ChainMap(msg)
self._flows[oid] = cm
# TODO: wrap all this in the `collections.abc.Mapping` interface?
def get(
self,
oid: str,
) -> ChainMap[str, Any]:
'''
Return the dialog `ChainMap` for provided id.
'''
return self._flows.get(oid, None)
def pop(
self,
oid: str,
) -> ChainMap[str, Any]:
'''
Pop and thus remove the `ChainMap` containing the msg flow
for the given order id.
'''
return self._flows.pop(oid)
async def handle_order_requests( async def handle_order_requests(
ems_order_stream: tractor.MsgStream, ems_order_stream: tractor.MsgStream,
client: Client, client: Client,

View File

@ -26,12 +26,14 @@ from ._client import (
from ._ems import ( from ._ems import (
open_brokerd_dialog, open_brokerd_dialog,
) )
from ._util import OrderDialogs
__all__ = [ __all__ = [
'open_ems', 'open_ems',
'OrderClient', 'OrderClient',
'open_brokerd_dialog', 'open_brokerd_dialog',
'OrderDialogs',
] ]

View File

@ -17,12 +17,15 @@
Sub-sys module commons. Sub-sys module commons.
""" """
from collections import ChainMap
from functools import partial from functools import partial
from typing import Any
from ..log import ( from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from piker.data.types import Struct
subsys: str = 'piker.clearing' subsys: str = 'piker.clearing'
log = get_logger(subsys) log = get_logger(subsys)
@ -31,3 +34,63 @@ get_console_log = partial(
get_console_log, get_console_log,
name=subsys, name=subsys,
) )
# TODO: use this in other backends like kraken which currently has
# a less formalized version more or less:
# `apiflows[reqid].maps.append(status_msg.to_dict())`
class OrderDialogs(Struct):
'''
Order control dialog (and thus transaction) tracking via
message recording.
Allows easily recording messages associated with a given set of
order control transactions and looking up the latest field
state using the entire (reverse chronological) msg flow.
'''
_flows: dict[str, ChainMap] = {}
def add_msg(
self,
oid: str,
msg: dict,
) -> None:
# NOTE: manually enter a new map on the first msg add to
# avoid creating one with an empty dict first entry in
# `ChainMap.maps` which is the default if none passed at
# init.
cm: ChainMap = self._flows.get(oid)
if cm:
cm.maps.insert(0, msg)
else:
cm = ChainMap(msg)
self._flows[oid] = cm
# TODO: wrap all this in the `collections.abc.Mapping` interface?
def get(
self,
oid: str,
) -> ChainMap[str, Any]:
'''
Return the dialog `ChainMap` for provided id.
'''
return self._flows.get(oid, None)
def pop(
self,
oid: str,
) -> ChainMap[str, Any]:
'''
Pop and thus remove the `ChainMap` containing the msg flow
for the given order id.
'''
return self._flows.pop(oid)