diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 14c77d54..8f531d6d 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -46,70 +46,86 @@ if TYPE_CHECKING: ) -class OrderBook(Struct): - '''EMS-client-side order book ctl and tracking. +class OrderClient(Struct): + ''' + EMS-client-side order book ctl and tracking. - A style similar to "model-view" is used here where this api is - provided as a supervised control for an EMS actor which does all the - hard/fast work of talking to brokers/exchanges to conduct - executions. - - Currently, this is mostly for keeping local state to match the EMS - and use received events to trigger graphics updates. + (A)sync API for submitting orders and alerts to the `emsd` service; + this is the main control for execution management from client code. ''' + # IPC stream to `emsd` actor + _ems_stream: tractor.MsgStream + # mem channels used to relay order requests to the EMS daemon - _to_ems: trio.abc.SendChannel - _from_order_book: trio.abc.ReceiveChannel + _to_relay_task: trio.abc.SendChannel + _from_sync_order_client: trio.abc.ReceiveChannel + + # history table _sent_orders: dict[str, Order] = {} def send_nowait( self, msg: Order | dict, - ) -> dict: + ) -> dict | Order: + ''' + Sync version of ``.send()``. + + ''' self._sent_orders[msg.oid] = msg - self._to_ems.send_nowait(msg) + self._to_relay_task.send_nowait(msg) return msg - # TODO: make this an async version.. - def send( + async def send( self, msg: Order | dict, - ) -> dict: - log.warning('USE `.send_nowait()` instead!') - return self.send_nowait(msg) + ) -> dict | Order: + ''' + Send a new order msg async to the `emsd` service. + + ''' + self._sent_orders[msg.oid] = msg + await self._ems_stream.send(msg) + return msg def update_nowait( self, - uuid: str, **data: dict, ) -> dict: + ''' + Sync version of ``.update()``. + + ''' cmd = self._sent_orders[uuid] msg = cmd.copy(update=data) self._sent_orders[uuid] = msg - self._to_ems.send_nowait(msg) - return cmd + self._to_relay_task.send_nowait(msg) + return msg - # TODO: async meth for this! - # def update( - # self, - # uuid: str, - # **data: dict, - # ) -> dict: - # ... - - def cancel_nowait( + async def update( self, uuid: str, - ) -> bool: + **data: dict, + ) -> dict: ''' - Cancel an order (or alert) in the EMS. + Update an existing order dialog with a msg updated from + ``update`` kwargs. ''' + cmd = self._sent_orders[uuid] + msg = cmd.copy(update=data) + self._sent_orders[uuid] = msg + await self._ems_stream.send(msg) + return msg + + def _mk_cancel_msg( + self, + uuid: str, + ) -> Cancel: cmd = self._sent_orders.get(uuid) if not cmd: log.error( @@ -118,85 +134,76 @@ class OrderBook(Struct): f'You should report this as a bug!' ) fqme = str(cmd.symbol) - msg = Cancel( + return Cancel( oid=uuid, symbol=fqme, ) - self._to_ems.send_nowait(msg) - # TODO: make this an async version.. - def cancel( + def cancel_nowait( self, uuid: str, - ) -> bool: - log.warning('USE `.cancel_nowait()` instead!') - return self.cancel_nowait(uuid) + ) -> None: + ''' + Sync version of ``.cancel()``. -_orders: OrderBook = None - - -def get_orders( - emsd_uid: tuple[str, str] = None -) -> OrderBook: - """" - OrderBook singleton factory per actor. - - """ - if emsd_uid is not None: - # TODO: read in target emsd's active book on startup - pass - - global _orders - - if _orders is None: - size = 100 - tx, rx = trio.open_memory_channel(size) - brx = broadcast_receiver(rx, size) - - # setup local ui event streaming channels for request/resp - # streamging with EMS daemon - _orders = OrderBook( - _to_ems=tx, - _from_order_book=brx, + ''' + self._to_relay_task.send_nowait( + self._mk_cancel_msg(uuid) ) - return _orders + async def cancel( + self, + uuid: str, + + ) -> bool: + ''' + Cancel an already existintg order (or alert) dialog. + + ''' + await self._ems_stream.send( + self._mk_cancel_msg(uuid) + ) -# TODO: we can get rid of this relay loop once we move -# order_mode inputs to async code! -async def relay_order_cmds_from_sync_code( +_client: OrderClient = None + +async def relay_orders_from_sync_code( + + client: OrderClient, symbol_key: str, to_ems_stream: tractor.MsgStream, ) -> None: - """ - Order streaming task: deliver orders transmitted from UI - to downstream consumers. + ''' + Order submission relay task: deliver orders sent from synchronous (UI) + code to the EMS via ``OrderClient._from_sync_order_client``. This is run in the UI actor (usually the one running Qt but could be any other client service code). This process simply delivers order - messages to the above ``_to_ems`` send channel (from sync code using + messages to the above ``_to_relay_task`` send channel (from sync code using ``.send_nowait()``), these values are pulled from the channel here and relayed to any consumer(s) that called this function using a ``tractor`` portal. This effectively makes order messages look like they're being "pushed" from the parent to the EMS where local sync code is likely - doing the pushing from some UI. + doing the pushing from some non-async UI handler. - """ - book = get_orders() - async with book._from_order_book.subscribe() as orders_stream: - async for cmd in orders_stream: + ''' + async with ( + client._from_sync_order_client.subscribe() as sync_order_cmds + ): + async for cmd in sync_order_cmds: sym = cmd.symbol msg = pformat(cmd) + if sym == symbol_key: log.info(f'Send order cmd:\n{msg}') # send msg over IPC / wire await to_ems_stream.send(cmd) + else: log.warning( f'Ignoring unmatched order cmd for {sym} != {symbol_key}:' @@ -211,7 +218,7 @@ async def open_ems( loglevel: str = 'error', ) -> tuple[ - OrderBook, + OrderClient, tractor.MsgStream, dict[ # brokername, acctid @@ -222,42 +229,15 @@ async def open_ems( dict[str, Status], ]: ''' - Spawn an EMS daemon and begin sending orders and receiving - alerts. + (Maybe) spawn an EMS-daemon (emsd), deliver an `OrderClient` for + requesting orders/alerts and a `trades_stream` which delivers all + response-msgs. - This EMS tries to reduce most broker's terrible order entry apis to - a very simple protocol built on a few easy to grok and/or - "rantsy" premises: - - - most users will prefer "dark mode" where orders are not submitted - to a broker until and execution condition is triggered - (aka client-side "hidden orders") - - - Brokers over-complicate their apis and generally speaking hire - poor designers to create them. We're better off using creating a super - minimal, schema-simple, request-event-stream protocol to unify all the - existing piles of shit (and shocker, it'll probably just end up - looking like a decent crypto exchange's api) - - - all order types can be implemented with client-side limit orders - - - we aren't reinventing a wheel in this case since none of these - brokers are exposing FIX protocol; it is they doing the re-invention. - - - TODO: make some fancy diagrams using mermaid.io - - the possible set of responses from the stream is currently: - - 'dark_submitted', 'broker_submitted' - - 'dark_cancelled', 'broker_cancelled' - - 'dark_executed', 'broker_executed' - - 'broker_filled' + This is a "client side" entrypoint which may spawn the `emsd` service + if it can't be discovered and generally speaking is the lowest level + broker control client-API. ''' - # wait for service to connect back to us signalling - # ready for order commands - book = get_orders() - broker, symbol, suffix = unpack_fqme(fqme) async with maybe_open_emsd(broker) as portal: @@ -291,16 +271,34 @@ async def open_ems( # open 2-way trade command stream ctx.open_stream() as trades_stream, ): + # use any pre-existing actor singleton client. + global _client + if _client is None: + size = 100 + tx, rx = trio.open_memory_channel(size) + brx = broadcast_receiver(rx, size) + + # setup local ui event streaming channels for request/resp + # streamging with EMS daemon + _client = OrderClient( + _ems_stream=trades_stream, + _to_relay_task=tx, + _from_sync_order_client=brx, + ) + + _client._ems_stream = trades_stream + # start sync code order msg delivery task async with trio.open_nursery() as n: n.start_soon( - relay_order_cmds_from_sync_code, + relay_orders_from_sync_code, + _client, fqme, trades_stream ) yield ( - book, + _client, trades_stream, positions, accounts, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 61fdb6d9..0013891c 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -40,7 +40,10 @@ from ..accounting import Position from ..accounting._allocate import ( mk_allocator, ) -from ..clearing._client import open_ems, OrderBook +from ..clearing._client import ( + open_ems, + OrderClient, +) from ._style import _font from ..accounting._mktinfo import Symbol from ..data.feed import ( @@ -120,7 +123,7 @@ class OrderMode: chart: ChartPlotWidget # type: ignore # noqa hist_chart: ChartPlotWidget # type: ignore # noqa nursery: trio.Nursery # used by ``ui._position`` code? - book: OrderBook + book: OrderClient lines: LineEditor arrows: ArrowEditor multistatus: MultiStatus @@ -679,7 +682,7 @@ async def open_order_mode( multistatus = chart.window().status_bar done = multistatus.open_status('starting order mode..') - book: OrderBook + book: OrderClient trades_stream: tractor.MsgStream # The keys in this dict **must** be in set our set of "normalized" @@ -923,7 +926,7 @@ async def process_trades_and_update_ui( trades_stream: tractor.MsgStream, mode: OrderMode, - book: OrderBook, + book: OrderClient, ) -> None: @@ -939,7 +942,7 @@ async def process_trades_and_update_ui( async def process_trade_msg( mode: OrderMode, - book: OrderBook, + book: OrderClient, msg: dict, ) -> tuple[Dialog, Status]: