diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 3c4f8479..8503e049 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -31,6 +31,7 @@ import time from typing import ( Any, AsyncIterator, + Iterable, Union, ) @@ -86,6 +87,33 @@ class TooFastEdit(Exception): 'Edit requests faster then api submissions' +# TODO: make this wrap the `Client` and `ws` instances +# and give it methods to submit cancel vs. add vs. edit +# requests? +class BrokerClient: + ''' + Actor global, client-unique order manager API. + + For now provides unique ``brokerd`` defined "request ids" + and "user reference" values to track ``kraken`` ws api order + dialogs. + + ''' + counter: Iterable = count(1) + _table: set[int] = set() + + @classmethod + def new_reqid(cls) -> int: + for reqid in cls.counter: + if reqid not in cls._table: + cls._table.add(reqid) + return reqid + + @classmethod + def add_reqid(cls, reqid: int) -> None: + cls._table.add(reqid) + + async def handle_order_requests( ws: NoBsWs, @@ -105,7 +133,6 @@ async def handle_order_requests( # XXX: UGH, let's unify this.. with ``msgspec``. msg: dict[str, Any] order: BrokerdOrder - counter = count(1) async for msg in ems_order_stream: log.info(f'Rx order msg:\n{pformat(msg)}') @@ -178,7 +205,8 @@ async def handle_order_requests( else: ep = 'addOrder' - reqid = next(counter) + + reqid = BrokerClient.new_reqid() ids[order.oid] = reqid log.debug( f"Adding order {reqid}\n" @@ -798,6 +826,10 @@ async def handle_order_updates( oid = str(reqid) ids[oid] = reqid # NOTE!: str -> int + # ensure wtv reqid they give us we don't re-use on + # new order submissions to this actor's client. + BrokerClient.add_reqid(reqid) + # fill out ``Status`` + boxed ``Order`` status_msg = Status( time_ns=time.time_ns(),