diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9b1c3246..b3b4679d 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,9 +18,10 @@ In da suit parlances: "Execution management systems" """ +from contextlib import asynccontextmanager +from dataclasses import dataclass, field from pprint import pformat import time -from dataclasses import dataclass, field from typing import AsyncIterator, Callable, Any from bidict import bidict @@ -31,6 +32,7 @@ import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks +from ..data.feed import Feed from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -254,30 +256,72 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') -# TODO: lots of cases still to handle -# XXX: right now this is very very ad-hoc to IB -# - short-sale but securities haven't been located, in this case we -# should probably keep the order in some kind of weird state or cancel -# it outright? -# status='PendingSubmit', message=''), -# status='Cancelled', message='Error 404, -# reqId 1550: Order held while securities are located.'), -# status='PreSubmitted', message='')], +class _Router(BaseModel): + '''Order router which manages per-broker dark books, alerts, + and clearing related data feed management. + + ''' + nursery: trio.Nursery + + feeds: dict[tuple[str, str], Any] = {} + books: dict[str, _DarkBook] = {} + dialogues: dict[str, list[tractor.MsgStream]] = {} + relays: dict[str, tuple[dict, tractor.MsgStream]] = {} + + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + + def get_dark_book( + self, + brokername: str, + + ) -> _DarkBook: + + return self.books.setdefault(brokername, _DarkBook(brokername)) + + +_router: _Router = None + + +@tractor.context +async def _setup_persistent_emsd( + + ctx: tractor.Context, + +) -> None: + + global _router + + # open a root "service nursery" for the ``emsd`` actor + async with trio.open_nursery() as service_nursery: + + _router = _Router(nursery=service_nursery) + + # TODO: send back the full set of persistent orders/execs persistent + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + async def translate_and_relay_brokerd_events( broker: str, - ems_client_order_stream: tractor.MsgStream, + # ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, book: _DarkBook, + router: _Router, ) -> AsyncIterator[dict]: - """Trades update loop - receive updates from broker, convert - to EMS responses, transmit to ordering client(s). + '''Trades update loop - receive updates from ``brokerd`` trades + endpoint, convert to EMS response msgs, transmit **only** to + ordering client(s). - This is where trade confirmations from the broker are processed - and appropriate responses relayed back to the original EMS client - actor. There is a messaging translation layer throughout. + This is where trade confirmations from the broker are processed and + appropriate responses relayed **only** back to the original EMS + client actor. There is a messaging translation layer throughout. Expected message translation(s): @@ -286,10 +330,10 @@ async def translate_and_relay_brokerd_events( 'status' -> relabel as 'broker_', if complete send 'executed' 'fill' -> 'broker_filled' - Currently accepted status values from IB: + Currently handled status values from IB: {'presubmitted', 'submitted', 'cancelled', 'inactive'} - """ + ''' async for brokerd_msg in brokerd_trades_stream: name = brokerd_msg['name'] @@ -298,10 +342,14 @@ async def translate_and_relay_brokerd_events( if name == 'position': - # relay through position msgs immediately - await ems_client_order_stream.send( - BrokerdPosition(**brokerd_msg).dict() - ) + # relay through position msgs immediately by + # broadcasting updates on all client streams + for oid, ems_client_order_stream in router.dialogues.items(): + + await ems_client_order_stream.send( + BrokerdPosition(**brokerd_msg).dict() + ) + continue # Get the broker (order) request id, this **must** be normalized @@ -331,7 +379,7 @@ async def translate_and_relay_brokerd_events( # may be an order msg specified as "external" to the # piker ems flow (i.e. generated by some other # external broker backend client (like tws for ib) - ext = brokerd_msg.get('external') + ext = brokerd_msg['broker_details'].get('external') if ext: log.error(f"External trade event {ext}") @@ -377,6 +425,7 @@ async def translate_and_relay_brokerd_events( resp = None broker_details = {} + client_flow_complete: bool = False if name in ( 'error', @@ -407,22 +456,13 @@ async def translate_and_relay_brokerd_events( elif name in ( 'status', ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case msg = BrokerdStatus(**brokerd_msg) + if msg.status == 'cancelled': + + client_flow_complete = True + log.info(f'Cancellation for {oid} is complete!') + if msg.status == 'filled': # conditional execution is fully complete, no more @@ -431,6 +471,9 @@ async def translate_and_relay_brokerd_events( resp = 'broker_executed' + # be sure to pop this stream from our dialogue set + # since the order dialogue should be done. + client_flow_complete = True log.info(f'Execution for {oid} is complete!') # just log it @@ -460,6 +503,7 @@ async def translate_and_relay_brokerd_events( # Create and relay response status message # to requesting EMS client + ems_client_order_stream = router.dialogues[oid] await ems_client_order_stream.send( Status( oid=oid, @@ -470,6 +514,9 @@ async def translate_and_relay_brokerd_events( ).dict() ) + if client_flow_complete: + router.dialogues.pop(oid) + async def process_client_order_cmds( @@ -477,8 +524,9 @@ async def process_client_order_cmds( brokerd_order_stream: tractor.MsgStream, symbol: str, - feed: 'Feed', # noqa + feed: Feed, # noqa dark_book: _DarkBook, + router: _Router, ) -> None: @@ -489,6 +537,16 @@ async def process_client_order_cmds( action = cmd['action'] oid = cmd['oid'] + + # register this stream as an active dialogue for this order id + # such that translated message from the brokerd backend can be + # routed (relayed) to **just** that client stream (and in theory + # others who are registered for such order affiliated msgs). + + # TODO: make ``tractor.MsgStream`` a frozen type again such that it + # can be stored in sets like the old context was. + router.dialogues[oid] = client_order_stream + reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) @@ -499,14 +557,17 @@ async def process_client_order_cmds( # check for live-broker order if live_entry: + reqid = reqid or live_entry.reqid + assert reqid + msg = BrokerdCancel( oid=oid, - reqid=reqid or live_entry.reqid, + reqid=reqid, time_ns=time.time_ns(), ) # send cancel to brokerd immediately! - log.info("Submitting cancel for live order") + log.info("Submitting cancel for live order {reqid}") # NOTE: cancel response will be relayed back in messages # from corresponding broker @@ -515,7 +576,8 @@ async def process_client_order_cmds( else: # this might be a cancel for an order that hasn't been # acked yet by a brokerd, so register a cancel for when - # the order ack does show up later + # the order ack does show up later such that the brokerd + # order request can be cancelled at that time. dark_book._ems_entries[oid] = msg # check for EMS active exec @@ -532,6 +594,8 @@ async def process_client_order_cmds( time_ns=time.time_ns(), ).dict() ) + # de-register this client dialogue + router.dialogues.pop(oid) except KeyError: log.exception(f'No dark order for {symbol}?') @@ -581,17 +645,22 @@ async def process_client_order_cmds( log.info(f'Sending live order to {broker}:\n{pformat(msg)}') await brokerd_order_stream.send(msg.dict()) - # an immediate response should be brokerd ack with order - # id but we register our request as part of the flow + # an immediate response should be ``BrokerdOrderAck`` + # with ems order id from the ``trades_dialogue()`` + # endpoint, but we register our request as part of the + # flow so that if a cancel comes from the requesting + # client, before that ack, when the ack does arrive we + # immediately take the reqid from the broker and cancel + # that order with them immediately. dark_book._ems_entries[oid] = msg + # "DARK" triggers + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions elif exec_mode in ('dark', 'paper') or ( action in ('alert') ): - # submit order to local EMS book and scan loop, - # effectively a local clearing engine, which - # scans for conditions and triggers matching executions - # Auto-gen scanner predicate: # we automatically figure out what the alert check # condition should be based on the current first @@ -637,11 +706,11 @@ async def process_client_order_cmds( percent_away, abs_diff_away ) + resp = 'dark_submitted' + # alerts have special msgs to distinguish if action == 'alert': resp = 'alert_submitted' - else: - resp = 'dark_submitted' await client_order_stream.send( Status( @@ -652,6 +721,97 @@ async def process_client_order_cmds( ) +@asynccontextmanager +async def maybe_open_brokerd_trades_dialogue( + + router: _Router, + feed: Feed, + broker: str, + symbol: str, + dark_book: _DarkBook, + _exec_mode: str, + loglevel: str, + +) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + portal = feed._brokerd_portal + + if broker in _router.relays: + + positions, brokerd_trades_stream = _router.relays[broker] + + # TODO: get updated positions here? + yield positions, brokerd_trades_stream + return + + if trades_endpoint is None or _exec_mode == 'paper': + + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + async with ( + + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + trio.open_nursery() as n, + + ): + # XXX: really we only want one stream per `emsd` actor + # to relay global `brokerd` order events unless we're + # doing to expect each backend to relay only orders + # affiliated with a particular ``trades_dialogue()`` + # session (seems annoying for implementers). So, here + # we cache the relay task and instead of running multiple + # tasks (which will result in multiples of the same msg being + # relayed for each EMS client) we just register each client + # stream to this single relay loop using _router.dialogues + + # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client set. + + n.start_soon( + + translate_and_relay_brokerd_events, + + broker, + # ems_client_order_stream, + brokerd_trades_stream, + dark_book, + _router, + ) + + _router.relays[broker] = (positions, brokerd_trades_stream) + + yield positions, brokerd_trades_stream + + @tractor.context async def _emsd_main( @@ -697,6 +857,8 @@ async def _emsd_main( ''' global _router + assert _router + dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -711,8 +873,6 @@ async def _emsd_main( # spawn one task per broker feed async with ( - trio.open_nursery() as n, - # TODO: eventually support N-brokers data.open_feed( broker, @@ -732,39 +892,24 @@ async def _emsd_main( book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - portal = feed._brokerd_portal - - if trades_endpoint is None or _exec_mode == 'paper': - - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running a - # paper-simulator clearing engine. - - # load the paper trading engine - _exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - async with ( - open_trades_endpoint as (brokerd_ctx, positions), - brokerd_ctx.open_stream() as brokerd_trades_stream, + + # only open if one isn't already up: we try to keep + # as few duplicate streams as necessary + maybe_open_brokerd_trades_dialogue( + _router, + feed, + broker, + symbol, + dark_book, + _exec_mode, + loglevel, + + ) as (positions, brokerd_trades_stream), + + trio.open_nursery() as n, ): + # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? @@ -787,72 +932,13 @@ async def _emsd_main( book ) - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client. - n.start_soon( - translate_and_relay_brokerd_events, - - broker, - ems_client_order_stream, - brokerd_trades_stream, - dark_book, - ) - # start inbound (from attached client) order request processing await process_client_order_cmds( + ems_client_order_stream, brokerd_trades_stream, symbol, feed, dark_book, + _router, ) - - -class _Router(BaseModel): - '''Order router which manages per-broker dark books, alerts, - and clearing related data feed management. - - ''' - nursery: trio.Nursery - - feeds: dict[tuple[str, str], Any] = {} - books: dict[str, _DarkBook] = {} - - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False - - def get_dark_book( - self, - brokername: str, - - ) -> _DarkBook: - - return self.books.setdefault(brokername, _DarkBook(brokername)) - - -_router: _Router = None - - -@tractor.context -async def _setup_persistent_emsd( - - ctx: tractor.Context, - -) -> None: - - global _router - - # open a root "service nursery" for the ``emsd`` actor - async with trio.open_nursery() as service_nursery: - - _router = _Router(nursery=service_nursery) - - # TODO: send back the full set of persistent orders/execs persistent - await ctx.started() - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever()