Async-ify order client methods and some renaming

We previously only offered a sync API (which was recently renamed to
`.<meth>_nowait()` style) since initially all order control was from our
`OrderMode` Qt driven UI/UX. This adds the equivalent async methods for
both testing as well as eventual auto-strat driven control B)

Also includes a bunch of renaming:
- `OrderBook` -> `OrderClient`.
- better internal renaming of the client's mem chan vars and add a ref
  `._ems_stream: tractor.MsgStream`.
- drop `get_orders()` factory, just always check for the actor-global
  instance and always set the ems stream on that client (in case old one
  was closed).
pre_overruns_ctxcancelled
Tyler Goodlet 2023-03-30 16:21:18 -04:00
parent 5cb63a67e1
commit b8d7c05d74
2 changed files with 120 additions and 119 deletions

View File

@ -46,70 +46,86 @@ if TYPE_CHECKING:
) )
class OrderBook(Struct): class OrderClient(Struct):
'''EMS-client-side order book ctl and tracking. '''
EMS-client-side order book ctl and tracking.
A style similar to "model-view" is used here where this api is (A)sync API for submitting orders and alerts to the `emsd` service;
provided as a supervised control for an EMS actor which does all the this is the main control for execution management from client code.
hard/fast work of talking to brokers/exchanges to conduct
executions.
Currently, this is mostly for keeping local state to match the EMS
and use received events to trigger graphics updates.
''' '''
# IPC stream to `emsd` actor
_ems_stream: tractor.MsgStream
# mem channels used to relay order requests to the EMS daemon # mem channels used to relay order requests to the EMS daemon
_to_ems: trio.abc.SendChannel _to_relay_task: trio.abc.SendChannel
_from_order_book: trio.abc.ReceiveChannel _from_sync_order_client: trio.abc.ReceiveChannel
# history table
_sent_orders: dict[str, Order] = {} _sent_orders: dict[str, Order] = {}
def send_nowait( def send_nowait(
self, self,
msg: Order | dict, msg: Order | dict,
) -> dict: ) -> dict | Order:
'''
Sync version of ``.send()``.
'''
self._sent_orders[msg.oid] = msg self._sent_orders[msg.oid] = msg
self._to_ems.send_nowait(msg) self._to_relay_task.send_nowait(msg)
return msg return msg
# TODO: make this an async version.. async def send(
def send(
self, self,
msg: Order | dict, msg: Order | dict,
) -> dict: ) -> dict | Order:
log.warning('USE `.send_nowait()` instead!') '''
return self.send_nowait(msg) Send a new order msg async to the `emsd` service.
'''
self._sent_orders[msg.oid] = msg
await self._ems_stream.send(msg)
return msg
def update_nowait( def update_nowait(
self, self,
uuid: str, uuid: str,
**data: dict, **data: dict,
) -> dict: ) -> dict:
'''
Sync version of ``.update()``.
'''
cmd = self._sent_orders[uuid] cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data) msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg self._sent_orders[uuid] = msg
self._to_ems.send_nowait(msg) self._to_relay_task.send_nowait(msg)
return cmd return msg
# TODO: async meth for this! async def update(
# def update(
# self,
# uuid: str,
# **data: dict,
# ) -> dict:
# ...
def cancel_nowait(
self, self,
uuid: str, uuid: str,
) -> bool: **data: dict,
) -> dict:
''' '''
Cancel an order (or alert) in the EMS. Update an existing order dialog with a msg updated from
``update`` kwargs.
''' '''
cmd = self._sent_orders[uuid]
msg = cmd.copy(update=data)
self._sent_orders[uuid] = msg
await self._ems_stream.send(msg)
return msg
def _mk_cancel_msg(
self,
uuid: str,
) -> Cancel:
cmd = self._sent_orders.get(uuid) cmd = self._sent_orders.get(uuid)
if not cmd: if not cmd:
log.error( log.error(
@ -118,85 +134,76 @@ class OrderBook(Struct):
f'You should report this as a bug!' f'You should report this as a bug!'
) )
fqme = str(cmd.symbol) fqme = str(cmd.symbol)
msg = Cancel( return Cancel(
oid=uuid, oid=uuid,
symbol=fqme, symbol=fqme,
) )
self._to_ems.send_nowait(msg)
# TODO: make this an async version.. def cancel_nowait(
def cancel(
self, self,
uuid: str, uuid: str,
) -> bool:
log.warning('USE `.cancel_nowait()` instead!')
return self.cancel_nowait(uuid)
) -> None:
'''
Sync version of ``.cancel()``.
_orders: OrderBook = None '''
self._to_relay_task.send_nowait(
self._mk_cancel_msg(uuid)
def get_orders(
emsd_uid: tuple[str, str] = None
) -> OrderBook:
""""
OrderBook singleton factory per actor.
"""
if emsd_uid is not None:
# TODO: read in target emsd's active book on startup
pass
global _orders
if _orders is None:
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_orders = OrderBook(
_to_ems=tx,
_from_order_book=brx,
) )
return _orders async def cancel(
self,
uuid: str,
) -> bool:
'''
Cancel an already existintg order (or alert) dialog.
'''
await self._ems_stream.send(
self._mk_cancel_msg(uuid)
)
# TODO: we can get rid of this relay loop once we move _client: OrderClient = None
# order_mode inputs to async code!
async def relay_order_cmds_from_sync_code(
async def relay_orders_from_sync_code(
client: OrderClient,
symbol_key: str, symbol_key: str,
to_ems_stream: tractor.MsgStream, to_ems_stream: tractor.MsgStream,
) -> None: ) -> None:
""" '''
Order streaming task: deliver orders transmitted from UI Order submission relay task: deliver orders sent from synchronous (UI)
to downstream consumers. code to the EMS via ``OrderClient._from_sync_order_client``.
This is run in the UI actor (usually the one running Qt but could be This is run in the UI actor (usually the one running Qt but could be
any other client service code). This process simply delivers order any other client service code). This process simply delivers order
messages to the above ``_to_ems`` send channel (from sync code using messages to the above ``_to_relay_task`` send channel (from sync code using
``.send_nowait()``), these values are pulled from the channel here ``.send_nowait()``), these values are pulled from the channel here
and relayed to any consumer(s) that called this function using and relayed to any consumer(s) that called this function using
a ``tractor`` portal. a ``tractor`` portal.
This effectively makes order messages look like they're being This effectively makes order messages look like they're being
"pushed" from the parent to the EMS where local sync code is likely "pushed" from the parent to the EMS where local sync code is likely
doing the pushing from some UI. doing the pushing from some non-async UI handler.
""" '''
book = get_orders() async with (
async with book._from_order_book.subscribe() as orders_stream: client._from_sync_order_client.subscribe() as sync_order_cmds
async for cmd in orders_stream: ):
async for cmd in sync_order_cmds:
sym = cmd.symbol sym = cmd.symbol
msg = pformat(cmd) msg = pformat(cmd)
if sym == symbol_key: if sym == symbol_key:
log.info(f'Send order cmd:\n{msg}') log.info(f'Send order cmd:\n{msg}')
# send msg over IPC / wire # send msg over IPC / wire
await to_ems_stream.send(cmd) await to_ems_stream.send(cmd)
else: else:
log.warning( log.warning(
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:' f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
@ -211,7 +218,7 @@ async def open_ems(
loglevel: str = 'error', loglevel: str = 'error',
) -> tuple[ ) -> tuple[
OrderBook, OrderClient,
tractor.MsgStream, tractor.MsgStream,
dict[ dict[
# brokername, acctid # brokername, acctid
@ -222,42 +229,15 @@ async def open_ems(
dict[str, Status], dict[str, Status],
]: ]:
''' '''
Spawn an EMS daemon and begin sending orders and receiving (Maybe) spawn an EMS-daemon (emsd), deliver an `OrderClient` for
alerts. requesting orders/alerts and a `trades_stream` which delivers all
response-msgs.
This EMS tries to reduce most broker's terrible order entry apis to This is a "client side" entrypoint which may spawn the `emsd` service
a very simple protocol built on a few easy to grok and/or if it can't be discovered and generally speaking is the lowest level
"rantsy" premises: broker control client-API.
- most users will prefer "dark mode" where orders are not submitted
to a broker until and execution condition is triggered
(aka client-side "hidden orders")
- Brokers over-complicate their apis and generally speaking hire
poor designers to create them. We're better off using creating a super
minimal, schema-simple, request-event-stream protocol to unify all the
existing piles of shit (and shocker, it'll probably just end up
looking like a decent crypto exchange's api)
- all order types can be implemented with client-side limit orders
- we aren't reinventing a wheel in this case since none of these
brokers are exposing FIX protocol; it is they doing the re-invention.
TODO: make some fancy diagrams using mermaid.io
the possible set of responses from the stream is currently:
- 'dark_submitted', 'broker_submitted'
- 'dark_cancelled', 'broker_cancelled'
- 'dark_executed', 'broker_executed'
- 'broker_filled'
''' '''
# wait for service to connect back to us signalling
# ready for order commands
book = get_orders()
broker, symbol, suffix = unpack_fqme(fqme) broker, symbol, suffix = unpack_fqme(fqme)
async with maybe_open_emsd(broker) as portal: async with maybe_open_emsd(broker) as portal:
@ -291,16 +271,34 @@ async def open_ems(
# open 2-way trade command stream # open 2-way trade command stream
ctx.open_stream() as trades_stream, ctx.open_stream() as trades_stream,
): ):
# use any pre-existing actor singleton client.
global _client
if _client is None:
size = 100
tx, rx = trio.open_memory_channel(size)
brx = broadcast_receiver(rx, size)
# setup local ui event streaming channels for request/resp
# streamging with EMS daemon
_client = OrderClient(
_ems_stream=trades_stream,
_to_relay_task=tx,
_from_sync_order_client=brx,
)
_client._ems_stream = trades_stream
# start sync code order msg delivery task # start sync code order msg delivery task
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon( n.start_soon(
relay_order_cmds_from_sync_code, relay_orders_from_sync_code,
_client,
fqme, fqme,
trades_stream trades_stream
) )
yield ( yield (
book, _client,
trades_stream, trades_stream,
positions, positions,
accounts, accounts,

View File

@ -40,7 +40,10 @@ from ..accounting import Position
from ..accounting._allocate import ( from ..accounting._allocate import (
mk_allocator, mk_allocator,
) )
from ..clearing._client import open_ems, OrderBook from ..clearing._client import (
open_ems,
OrderClient,
)
from ._style import _font from ._style import _font
from ..accounting._mktinfo import Symbol from ..accounting._mktinfo import Symbol
from ..data.feed import ( from ..data.feed import (
@ -120,7 +123,7 @@ class OrderMode:
chart: ChartPlotWidget # type: ignore # noqa chart: ChartPlotWidget # type: ignore # noqa
hist_chart: ChartPlotWidget # type: ignore # noqa hist_chart: ChartPlotWidget # type: ignore # noqa
nursery: trio.Nursery # used by ``ui._position`` code? nursery: trio.Nursery # used by ``ui._position`` code?
book: OrderBook book: OrderClient
lines: LineEditor lines: LineEditor
arrows: ArrowEditor arrows: ArrowEditor
multistatus: MultiStatus multistatus: MultiStatus
@ -679,7 +682,7 @@ async def open_order_mode(
multistatus = chart.window().status_bar multistatus = chart.window().status_bar
done = multistatus.open_status('starting order mode..') done = multistatus.open_status('starting order mode..')
book: OrderBook book: OrderClient
trades_stream: tractor.MsgStream trades_stream: tractor.MsgStream
# The keys in this dict **must** be in set our set of "normalized" # The keys in this dict **must** be in set our set of "normalized"
@ -923,7 +926,7 @@ async def process_trades_and_update_ui(
trades_stream: tractor.MsgStream, trades_stream: tractor.MsgStream,
mode: OrderMode, mode: OrderMode,
book: OrderBook, book: OrderClient,
) -> None: ) -> None:
@ -939,7 +942,7 @@ async def process_trades_and_update_ui(
async def process_trade_msg( async def process_trade_msg(
mode: OrderMode, mode: OrderMode,
book: OrderBook, book: OrderClient,
msg: dict, msg: dict,
) -> tuple[Dialog, Status]: ) -> tuple[Dialog, Status]: