Add actor-global "broker client" for tracking reqids
parent
7379dc03af
commit
ee8c00684b
|
@ -31,6 +31,7 @@ import time
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
|
Iterable,
|
||||||
Union,
|
Union,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -86,6 +87,33 @@ class TooFastEdit(Exception):
|
||||||
'Edit requests faster then api submissions'
|
'Edit requests faster then api submissions'
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: make this wrap the `Client` and `ws` instances
|
||||||
|
# and give it methods to submit cancel vs. add vs. edit
|
||||||
|
# requests?
|
||||||
|
class BrokerClient:
|
||||||
|
'''
|
||||||
|
Actor global, client-unique order manager API.
|
||||||
|
|
||||||
|
For now provides unique ``brokerd`` defined "request ids"
|
||||||
|
and "user reference" values to track ``kraken`` ws api order
|
||||||
|
dialogs.
|
||||||
|
|
||||||
|
'''
|
||||||
|
counter: Iterable = count(1)
|
||||||
|
_table: set[int] = set()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def new_reqid(cls) -> int:
|
||||||
|
for reqid in cls.counter:
|
||||||
|
if reqid not in cls._table:
|
||||||
|
cls._table.add(reqid)
|
||||||
|
return reqid
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def add_reqid(cls, reqid: int) -> None:
|
||||||
|
cls._table.add(reqid)
|
||||||
|
|
||||||
|
|
||||||
async def handle_order_requests(
|
async def handle_order_requests(
|
||||||
|
|
||||||
ws: NoBsWs,
|
ws: NoBsWs,
|
||||||
|
@ -105,7 +133,6 @@ async def handle_order_requests(
|
||||||
# XXX: UGH, let's unify this.. with ``msgspec``.
|
# XXX: UGH, let's unify this.. with ``msgspec``.
|
||||||
msg: dict[str, Any]
|
msg: dict[str, Any]
|
||||||
order: BrokerdOrder
|
order: BrokerdOrder
|
||||||
counter = count(1)
|
|
||||||
|
|
||||||
async for msg in ems_order_stream:
|
async for msg in ems_order_stream:
|
||||||
log.info(f'Rx order msg:\n{pformat(msg)}')
|
log.info(f'Rx order msg:\n{pformat(msg)}')
|
||||||
|
@ -178,7 +205,8 @@ async def handle_order_requests(
|
||||||
|
|
||||||
else:
|
else:
|
||||||
ep = 'addOrder'
|
ep = 'addOrder'
|
||||||
reqid = next(counter)
|
|
||||||
|
reqid = BrokerClient.new_reqid()
|
||||||
ids[order.oid] = reqid
|
ids[order.oid] = reqid
|
||||||
log.debug(
|
log.debug(
|
||||||
f"Adding order {reqid}\n"
|
f"Adding order {reqid}\n"
|
||||||
|
@ -798,6 +826,10 @@ async def handle_order_updates(
|
||||||
oid = str(reqid)
|
oid = str(reqid)
|
||||||
ids[oid] = reqid # NOTE!: str -> int
|
ids[oid] = reqid # NOTE!: str -> int
|
||||||
|
|
||||||
|
# ensure wtv reqid they give us we don't re-use on
|
||||||
|
# new order submissions to this actor's client.
|
||||||
|
BrokerClient.add_reqid(reqid)
|
||||||
|
|
||||||
# fill out ``Status`` + boxed ``Order``
|
# fill out ``Status`` + boxed ``Order``
|
||||||
status_msg = Status(
|
status_msg = Status(
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
|
|
Loading…
Reference in New Issue