Async-ify order client methods and some renaming

We previously only offered a sync API (which was recently renamed to
`.<meth>_nowait()` style) since initially all order control was from our
`OrderMode` Qt driven UI/UX. This adds the equivalent async methods for
both testing as well as eventual auto-strat driven control B)

Also includes a bunch of renaming:
- `OrderBook` -> `OrderClient`.
- better internal renaming of the client's mem chan vars and add a ref
  `._ems_stream: tractor.MsgStream`.
- drop `get_orders()` factory, just always check for the actor-global
  instance and always set the ems stream on that client (in case old one
  was closed).
rekt_pps
Tyler Goodlet 2023-03-30 16:21:18 -04:00
parent 48f096995f
commit 72abe98475
2 changed files with 120 additions and 119 deletions

View File

@ -46,70 +46,86 @@ if TYPE_CHECKING:
)
class OrderBook(Struct):
'''EMS-client-side order book ctl and tracking.
class OrderClient(Struct):
'''
EMS-client-side order book ctl and tracking.
A style similar to "model-view" is used here where this api is
provided as a supervised control for an EMS actor which does all the
hard/fast work of talking to brokers/exchanges to conduct
executions.
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
(A)sync API for submitting orders and alerts to the `emsd` service;
this is the main control for execution management from client code.
'''
# IPC stream to `emsd` actor
_ems_stream: tractor.MsgStream
# mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel
_to_relay_task: trio.abc.SendChannel
_from_sync_order_client: trio.abc.ReceiveChannel
# history table
_sent_orders: dict[str, Order] = {}
def send_nowait(
self,
msg: Order | dict,
) -> dict:
) -> dict | Order:
'''
Sync version of ``.send()``.
'''
self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg)
self._to_relay_task.send_nowait(msg)
return msg
# TODO: make this an async version..
def send(
async def send(
self,
msg: Order | dict,
) -> dict:
log.warning('USE `.send_nowait()` instead!')
return self.send_nowait(msg)
) -> dict | Order:
'''
Send a new order msg async to the `emsd` service.
'''
self._sent_orders[msg.oid] = msg
await self._ems_stream.send(msg)
return msg
def update_nowait(
self,
uuid: str,
**data: dict,
) -> dict:
'''
Sync version of ``.update()``.
'''
cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg)
return cmd
self._to_relay_task.send_nowait(msg)
return msg
# TODO: async meth for this!
# def update(
# self,
# uuid: str,
# **data: dict,
# ) -> dict:
# ...
def cancel_nowait(
async def update(
self,
uuid: str,
) -> bool:
**data: dict,
) -> dict:
'''
Cancel an order (or alert) in the EMS.
Update an existing order dialog with a msg updated from
``update`` kwargs.
'''
cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
await self._ems_stream.send(msg)
return msg
def _mk_cancel_msg(
self,
uuid: str,
) -> Cancel:
cmd = self._sent_orders.get(uuid)
if not cmd:
log.error(
@ -118,85 +134,76 @@ class OrderBook(Struct):
f'You should report this as a bug!'
)
fqme = str(cmd.symbol)
msg = Cancel(
return Cancel(
oid=uuid,
symbol=fqme,
)
self._to_ems.send_nowait(msg)
# TODO: make this an async version..
def cancel(
def cancel_nowait(
self,
uuid: str,
) -> bool:
log.warning('USE `.cancel_nowait()` instead!')
return self.cancel_nowait(uuid)
) -> None:
'''
Sync version of ``.cancel()``.
_orders: OrderBook = None
def get_orders(
emsd_uid: tuple[str, str] = None
) -> OrderBook:
""""
OrderBook singleton factory per actor.
"""
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
global _orders
if _orders is None:
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_orders = OrderBook(
_to_ems=tx,
_from_order_book=brx,
'''
self._to_relay_task.send_nowait(
self._mk_cancel_msg(uuid)
)
return _orders
async def cancel(
self,
uuid: str,
) -> bool:
'''
Cancel an already existintg order (or alert) dialog.
'''
await self._ems_stream.send(
self._mk_cancel_msg(uuid)
)
# TODO: we can get rid of this relay loop once we move
# order_mode inputs to async code!
async def relay_order_cmds_from_sync_code(
_client: OrderClient = None
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str,
to_ems_stream: tractor.MsgStream,
) -> None:
"""
Order streaming task: deliver orders transmitted from UI
to downstream consumers.
'''
Order submission relay task: deliver orders sent from synchronous (UI)
code to the EMS via ``OrderClient._from_sync_order_client``.
This is run in the UI actor (usually the one running Qt but could be
any other client service code). This process simply delivers order
messages to the above ``_to_ems`` send channel (from sync code using
messages to the above ``_to_relay_task`` send channel (from sync code using
``.send_nowait()``), these values are pulled from the channel here
and relayed to any consumer(s) that called this function using
a ``tractor`` portal.
This effectively makes order messages look like they're being
"pushed" from the parent to the EMS where local sync code is likely
doing the pushing from some UI.
doing the pushing from some non-async UI handler.
"""
book = get_orders()
async with book._from_order_book.subscribe() as orders_stream:
async for cmd in orders_stream:
'''
async with (
client._from_sync_order_client.subscribe() as sync_order_cmds
):
async for cmd in sync_order_cmds:
sym = cmd.symbol
msg = pformat(cmd)
if sym == symbol_key:
log.info(f'Send order cmd:\n{msg}')
# send msg over IPC / wire
await to_ems_stream.send(cmd)
else:
log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
@ -211,7 +218,7 @@ async def open_ems(
loglevel: str = 'error',
) -> tuple[
OrderBook,
OrderClient,
tractor.MsgStream,
dict[
# brokername, acctid
@ -222,42 +229,15 @@ async def open_ems(
dict[str, Status],
]:
'''
Spawn an EMS daemon and begin sending orders and receiving
alerts.
(Maybe) spawn an EMS-daemon (emsd), deliver an `OrderClient` for
requesting orders/alerts and a `trades_stream` which delivers all
response-msgs.
This EMS tries to reduce most broker's terrible order entry apis to
a very simple protocol built on a few easy to grok and/or
"rantsy" premises:
- most users will prefer "dark mode" where orders are not submitted
to a broker until and execution condition is triggered
(aka client-side "hidden orders")
- Brokers over-complicate their apis and generally speaking hire
poor designers to create them. We're better off using creating a super
minimal, schema-simple, request-event-stream protocol to unify all the
existing piles of shit (and shocker, it'll probably just end up
looking like a decent crypto exchange's api)
- all order types can be implemented with client-side limit orders
- we aren't reinventing a wheel in this case since none of these
brokers are exposing FIX protocol; it is they doing the re-invention.
TODO: make some fancy diagrams using mermaid.io
the possible set of responses from the stream is currently:
- 'dark_submitted', 'broker_submitted'
- 'dark_cancelled', 'broker_cancelled'
- 'dark_executed', 'broker_executed'
- 'broker_filled'
This is a "client side" entrypoint which may spawn the `emsd` service
if it can't be discovered and generally speaking is the lowest level
broker control client-API.
'''
# wait for service to connect back to us signalling
# ready for order commands
book = get_orders()
broker, symbol, suffix = unpack_fqme(fqme)
async with maybe_open_emsd(broker) as portal:
@ -291,16 +271,34 @@ async def open_ems(
# open 2-way trade command stream
ctx.open_stream() as trades_stream,
):
# use any pre-existing actor singleton client.
global _client
if _client is None:
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_client = OrderClient(
_ems_stream=trades_stream,
_to_relay_task=tx,
_from_sync_order_client=brx,
)
_client._ems_stream = trades_stream
# start sync code order msg delivery task
async with trio.open_nursery() as n:
n.start_soon(
relay_order_cmds_from_sync_code,
relay_orders_from_sync_code,
_client,
fqme,
trades_stream
)
yield (
book,
_client,
trades_stream,
positions,
accounts,

View File

@ -40,7 +40,10 @@ from ..accounting import Position
from ..accounting._allocate import (
mk_allocator,
)
from ..clearing._client import open_ems, OrderBook
from ..clearing._client import (
open_ems,
OrderClient,
)
from ._style import _font
from ..accounting._mktinfo import Symbol
from ..data.feed import (
@ -120,7 +123,7 @@ class OrderMode:
chart: ChartPlotWidget # type: ignore # noqa
hist_chart: ChartPlotWidget # type: ignore # noqa
nursery: trio.Nursery # used by ``ui._position`` code?
book: OrderBook
book: OrderClient
lines: LineEditor
arrows: ArrowEditor
multistatus: MultiStatus
@ -679,7 +682,7 @@ async def open_order_mode(
multistatus = chart.window().status_bar
done = multistatus.open_status('starting order mode..')
book: OrderBook
book: OrderClient
trades_stream: tractor.MsgStream
# The keys in this dict **must** be in set our set of "normalized"
@ -923,7 +926,7 @@ async def process_trades_and_update_ui(
trades_stream: tractor.MsgStream,
mode: OrderMode,
book: OrderBook,
book: OrderClient,
) -> None:
@ -939,7 +942,7 @@ async def process_trades_and_update_ui(
async def process_trade_msg(
mode: OrderMode,
book: OrderBook,
book: OrderClient,
msg: dict,
) -> tuple[Dialog, Status]: