diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9b1c3246..7dbbfbad 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,19 +18,23 @@ In da suit parlances: "Execution management systems" """ +from contextlib import asynccontextmanager +from dataclasses import dataclass, field from pprint import pformat import time -from dataclasses import dataclass, field from typing import AsyncIterator, Callable, Any from bidict import bidict from pydantic import BaseModel import trio +from trio_typing import TaskStatus import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks +from ..data.feed import Feed +from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -80,15 +84,16 @@ def mk_check( @dataclass class _DarkBook: - """Client-side execution book. + '''EMS-trigger execution book. - Contains conditions for executions (aka "orders") which are not - exposed to brokers and thus the market; i.e. these are privacy - focussed "client side" orders. + Contains conditions for executions (aka "orders" or "triggers") + which are not exposed to brokers and thus the market; i.e. these are + privacy focussed "client side" orders which are submitted in real-time + based on specified trigger conditions. - A singleton instance is created per EMS actor (for now). + An instance per `brokerd` is created per EMS actor (for now). - """ + ''' broker: str # levels which have an executable action (eg. alert, order, signal) @@ -254,30 +259,237 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') -# TODO: lots of cases still to handle -# XXX: right now this is very very ad-hoc to IB -# - short-sale but securities haven't been located, in this case we -# should probably keep the order in some kind of weird state or cancel -# it outright? -# status='PendingSubmit', message=''), -# status='Cancelled', message='Error 404, -# reqId 1550: Order held while securities are located.'), -# status='PreSubmitted', message='')], +@dataclass +class TradesRelay: + brokerd_dialogue: tractor.MsgStream + positions: dict[str, float] + consumers: int = 0 + + +class _Router(BaseModel): + '''Order router which manages and tracks per-broker dark book, + alerts, clearing and related data feed management. + + A singleton per ``emsd`` actor. + + ''' + # setup at actor spawn time + nursery: trio.Nursery + + feeds: dict[tuple[str, str], Any] = {} + + # broker to book map + books: dict[str, _DarkBook] = {} + + # order id to client stream map + clients: set[tractor.MsgStream] = set() + dialogues: dict[str, list[tractor.MsgStream]] = {} + + # brokername to trades-dialogues streams with ``brokerd`` actors + relays: dict[str, TradesRelay] = {} + + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + + def get_dark_book( + self, + brokername: str, + + ) -> _DarkBook: + + return self.books.setdefault(brokername, _DarkBook(brokername)) + + @asynccontextmanager + async def maybe_open_brokerd_trades_dialogue( + + self, + feed: Feed, + symbol: str, + dark_book: _DarkBook, + _exec_mode: str, + loglevel: str, + + ) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + relay = self.relays.get(feed.mod.name) + + if relay is None: + + relay = await self.nursery.start( + open_brokerd_trades_dialogue, + self, + feed, + symbol, + _exec_mode, + loglevel, + ) + + relay.consumers += 1 + + # TODO: get updated positions here? + assert relay.brokerd_dialogue + try: + yield relay + + finally: + + # TODO: what exactly needs to be torn down here or + # are we just consumer tracking? + + relay.consumers -= 1 + + +_router: _Router = None + + +async def open_brokerd_trades_dialogue( + + router: _Router, + feed: Feed, + symbol: str, + _exec_mode: str, + loglevel: str, + + task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, + +) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + + broker = feed.mod.name + + # TODO: make a `tractor` bug/test for this! + # portal = feed._brokerd_portal + + # XXX: we must have our own portal + channel otherwise + # when the data feed closes it may result in a half-closed + # channel that the brokerd side thinks is still open somehow!? + async with maybe_spawn_brokerd( + + broker, + loglevel=loglevel, + + ) as portal: + + if trades_endpoint is None or _exec_mode == 'paper': + + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + try: + async with ( + + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + + ): + # XXX: really we only want one stream per `emsd` actor + # to relay global `brokerd` order events unless we're + # doing to expect each backend to relay only orders + # affiliated with a particular ``trades_dialogue()`` + # session (seems annoying for implementers). So, here + # we cache the relay task and instead of running multiple + # tasks (which will result in multiples of the same msg being + # relayed for each EMS client) we just register each client + # stream to this single relay loop using _router.dialogues + + # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client set. + + relay = TradesRelay( + brokerd_dialogue=brokerd_trades_stream, + positions=positions, + consumers=1 + ) + + _router.relays[broker] = relay + + # the ems scan loop may be cancelled by the client but we + # want to keep the ``brokerd`` dialogue up regardless + + task_status.started(relay) + + await translate_and_relay_brokerd_events( + broker, + brokerd_trades_stream, + _router, + ) + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + + finally: + # parent context must have been closed + # remove from cache so next client will respawn if needed + _router.relays.pop(broker) + + +@tractor.context +async def _setup_persistent_emsd( + + ctx: tractor.Context, + +) -> None: + + global _router + + # open a root "service nursery" for the ``emsd`` actor + async with trio.open_nursery() as service_nursery: + + _router = _Router(nursery=service_nursery) + + # TODO: send back the full set of persistent + # orders/execs? + await ctx.started() + + # allow service tasks to run until cancelled + await trio.sleep_forever() + async def translate_and_relay_brokerd_events( broker: str, - ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, - book: _DarkBook, + router: _Router, ) -> AsyncIterator[dict]: - """Trades update loop - receive updates from broker, convert - to EMS responses, transmit to ordering client(s). + '''Trades update loop - receive updates from ``brokerd`` trades + endpoint, convert to EMS response msgs, transmit **only** to + ordering client(s). - This is where trade confirmations from the broker are processed - and appropriate responses relayed back to the original EMS client - actor. There is a messaging translation layer throughout. + This is where trade confirmations from the broker are processed and + appropriate responses relayed **only** back to the original EMS + client actor. There is a messaging translation layer throughout. Expected message translation(s): @@ -286,10 +498,15 @@ async def translate_and_relay_brokerd_events( 'status' -> relabel as 'broker_', if complete send 'executed' 'fill' -> 'broker_filled' - Currently accepted status values from IB: + Currently handled status values from IB: {'presubmitted', 'submitted', 'cancelled', 'inactive'} - """ + ''' + book = router.get_dark_book(broker) + relay = router.relays[broker] + + assert relay.brokerd_dialogue == brokerd_trades_stream + async for brokerd_msg in brokerd_trades_stream: name = brokerd_msg['name'] @@ -298,10 +515,16 @@ async def translate_and_relay_brokerd_events( if name == 'position': - # relay through position msgs immediately - await ems_client_order_stream.send( - BrokerdPosition(**brokerd_msg).dict() - ) + pos_msg = BrokerdPosition(**brokerd_msg).dict() + + # keep up to date locally in ``emsd`` + relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg) + + # relay through position msgs immediately by + # broadcasting updates on all client streams + for client_stream in router.clients: + await client_stream.send(pos_msg) + continue # Get the broker (order) request id, this **must** be normalized @@ -331,7 +554,7 @@ async def translate_and_relay_brokerd_events( # may be an order msg specified as "external" to the # piker ems flow (i.e. generated by some other # external broker backend client (like tws for ib) - ext = brokerd_msg.get('external') + ext = brokerd_msg['broker_details'].get('external') if ext: log.error(f"External trade event {ext}") @@ -407,22 +630,12 @@ async def translate_and_relay_brokerd_events( elif name in ( 'status', ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case msg = BrokerdStatus(**brokerd_msg) + if msg.status == 'cancelled': + + log.info(f'Cancellation for {oid} is complete!') + if msg.status == 'filled': # conditional execution is fully complete, no more @@ -431,6 +644,8 @@ async def translate_and_relay_brokerd_events( resp = 'broker_executed' + # be sure to pop this stream from our dialogue set + # since the order dialogue should be done. log.info(f'Execution for {oid} is complete!') # just log it @@ -460,15 +675,25 @@ async def translate_and_relay_brokerd_events( # Create and relay response status message # to requesting EMS client - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=broker_details, - ).dict() - ) + try: + ems_client_order_stream = router.dialogues[oid] + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ).dict() + ) + except KeyError: + log.error( + f'Received `brokerd` msg for unknown client with oid: {oid}') + + # TODO: do we want this to keep things cleaned up? + # it might require a special status from brokerd to affirm the + # flow is complete? + # router.dialogues.pop(oid) async def process_client_order_cmds( @@ -477,11 +702,14 @@ async def process_client_order_cmds( brokerd_order_stream: tractor.MsgStream, symbol: str, - feed: 'Feed', # noqa + feed: Feed, # noqa dark_book: _DarkBook, + router: _Router, ) -> None: + client_dialogues = router.dialogues + # cmd: dict async for cmd in client_order_stream: @@ -489,6 +717,18 @@ async def process_client_order_cmds( action = cmd['action'] oid = cmd['oid'] + + # TODO: make ``tractor.MsgStream`` a frozen type again such that it + # can be stored in sets like the old context was. + # wait, maybe this **is** already working thanks to our parent + # `trio` type? + + # register this stream as an active dialogue for this order id + # such that translated message from the brokerd backend can be + # routed (relayed) to **just** that client stream (and in theory + # others who are registered for such order affiliated msgs). + client_dialogues[oid] = client_order_stream + reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) @@ -498,27 +738,32 @@ async def process_client_order_cmds( # check for live-broker order if live_entry: + reqid = live_entry.reqid msg = BrokerdCancel( oid=oid, - reqid=reqid or live_entry.reqid, + reqid=reqid, time_ns=time.time_ns(), ) - # send cancel to brokerd immediately! - log.info("Submitting cancel for live order") - # NOTE: cancel response will be relayed back in messages # from corresponding broker - await brokerd_order_stream.send(msg.dict()) + if reqid: + # send cancel to brokerd immediately! + log.info("Submitting cancel for live order {reqid}") + + await brokerd_order_stream.send(msg.dict()) + + else: + # this might be a cancel for an order that hasn't been + # acked yet by a brokerd, so register a cancel for when + # the order ack does show up later such that the brokerd + # order request can be cancelled at that time. + dark_book._ems_entries[oid] = msg + + # dark trigger cancel else: - # this might be a cancel for an order that hasn't been - # acked yet by a brokerd, so register a cancel for when - # the order ack does show up later - dark_book._ems_entries[oid] = msg - - # check for EMS active exec try: # remove from dark book clearing dark_book.orders[symbol].pop(oid, None) @@ -532,6 +777,8 @@ async def process_client_order_cmds( time_ns=time.time_ns(), ).dict() ) + # de-register this client dialogue + router.dialogues.pop(oid) except KeyError: log.exception(f'No dark order for {symbol}?') @@ -581,17 +828,22 @@ async def process_client_order_cmds( log.info(f'Sending live order to {broker}:\n{pformat(msg)}') await brokerd_order_stream.send(msg.dict()) - # an immediate response should be brokerd ack with order - # id but we register our request as part of the flow + # an immediate response should be ``BrokerdOrderAck`` + # with ems order id from the ``trades_dialogue()`` + # endpoint, but we register our request as part of the + # flow so that if a cancel comes from the requesting + # client, before that ack, when the ack does arrive we + # immediately take the reqid from the broker and cancel + # that live order asap. dark_book._ems_entries[oid] = msg + # "DARK" triggers + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions elif exec_mode in ('dark', 'paper') or ( action in ('alert') ): - # submit order to local EMS book and scan loop, - # effectively a local clearing engine, which - # scans for conditions and triggers matching executions - # Auto-gen scanner predicate: # we automatically figure out what the alert check # condition should be based on the current first @@ -637,11 +889,11 @@ async def process_client_order_cmds( percent_away, abs_diff_away ) + resp = 'dark_submitted' + # alerts have special msgs to distinguish if action == 'alert': resp = 'alert_submitted' - else: - resp = 'dark_submitted' await client_order_stream.send( Status( @@ -686,7 +938,7 @@ async def _emsd_main( run (dark order) conditions on inputs and trigger brokerd "live" order submissions. | - - ``translate_and_relay_brokerd_events()``: + - (maybe) ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and relay to ems client(s); this is a effectively a "trade event reponse" proxy-broker. @@ -697,6 +949,8 @@ async def _emsd_main( ''' global _router + assert _router + dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -711,8 +965,6 @@ async def _emsd_main( # spawn one task per broker feed async with ( - trio.open_nursery() as n, - # TODO: eventually support N-brokers data.open_feed( broker, @@ -732,43 +984,28 @@ async def _emsd_main( book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - portal = feed._brokerd_portal - - if trades_endpoint is None or _exec_mode == 'paper': - - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running a - # paper-simulator clearing engine. - - # load the paper trading engine - _exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - async with ( - open_trades_endpoint as (brokerd_ctx, positions), - brokerd_ctx.open_stream() as brokerd_trades_stream, + + # only open if one isn't already up: we try to keep + # as few duplicate streams as necessary + _router.maybe_open_brokerd_trades_dialogue( + feed, + symbol, + dark_book, + _exec_mode, + loglevel, + + ) as relay, + + trio.open_nursery() as n, ): + + brokerd_stream = relay.brokerd_dialogue # .clone() + # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? - await ems_ctx.started(positions) + await ems_ctx.started(relay.positions) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates @@ -778,7 +1015,8 @@ async def _emsd_main( n.start_soon( clear_dark_triggers, - brokerd_trades_stream, + # relay.brokerd_dialogue, + brokerd_stream, ems_client_order_stream, feed.stream, @@ -787,72 +1025,42 @@ async def _emsd_main( book ) - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client. - n.start_soon( - translate_and_relay_brokerd_events, - - broker, - ems_client_order_stream, - brokerd_trades_stream, - dark_book, - ) - # start inbound (from attached client) order request processing - await process_client_order_cmds( - ems_client_order_stream, - brokerd_trades_stream, - symbol, - feed, - dark_book, - ) + try: + _router.clients.add(ems_client_order_stream) + await process_client_order_cmds( -class _Router(BaseModel): - '''Order router which manages per-broker dark books, alerts, - and clearing related data feed management. + ems_client_order_stream, - ''' - nursery: trio.Nursery + # relay.brokerd_dialogue, + brokerd_stream, - feeds: dict[tuple[str, str], Any] = {} - books: dict[str, _DarkBook] = {} + symbol, + feed, + dark_book, + _router, + ) - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False + finally: + # remove client from "registry" + _router.clients.remove(ems_client_order_stream) - def get_dark_book( - self, - brokername: str, + dialogues = _router.dialogues - ) -> _DarkBook: + for oid, client_stream in dialogues.items(): - return self.books.setdefault(brokername, _DarkBook(brokername)) + if client_stream == ems_client_order_stream: + log.warning( + f'client dialogue is being abandoned:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) + dialogues.pop(oid) -_router: _Router = None - - -@tractor.context -async def _setup_persistent_emsd( - - ctx: tractor.Context, - -) -> None: - - global _router - - # open a root "service nursery" for the ``emsd`` actor - async with trio.open_nursery() as service_nursery: - - _router = _Router(nursery=service_nursery) - - # TODO: send back the full set of persistent orders/execs persistent - await ctx.started() - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() + # TODO: for order dialogues left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a1d615f5..490ae4b0 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -233,27 +233,34 @@ async def sample_and_broadcast( for (stream, tick_throttle) in subs: - if tick_throttle: - await stream.send(quote) + try: + if tick_throttle: + await stream.send(quote) - else: - try: + else: await stream.send({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # XXX: do we need to deregister here - # if it's done in the fee bus code? - # so far seems like no since this should all - # be single-threaded. - log.error(f'{stream._ctx.chan.uid} dropped connection') + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # XXX: do we need to deregister here + # if it's done in the fee bus code? + # so far seems like no since this should all + # be single-threaded. + log.warning( + f'{stream._ctx.chan.uid} dropped ' + '`brokerd`-quotes-feed connection' + ) + subs.remove((stream, tick_throttle)) async def uniform_rate_send( + rate: float, quote_stream: trio.abc.ReceiveChannel, stream: tractor.MsgStream, + ) -> None: sleep_period = 1/rate - 0.000616 @@ -289,8 +296,14 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul - await stream.send({first_quote['symbol']: first_quote}) - break + try: + await stream.send({first_quote['symbol']: first_quote}) + break + except trio.ClosedResourceError: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning(f'{stream} closed') + return end = time.time() diff = end - start diff --git a/piker/data/feed.py b/piker/data/feed.py index 477e7bac..ed24a095 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -305,6 +305,11 @@ async def attach_feed_bus( ): if tick_throttle: + + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. + send, recv = trio.open_memory_channel(2**10) n.start_soon( uniform_rate_send, @@ -321,7 +326,12 @@ async def attach_feed_bus( try: await trio.sleep_forever() + finally: + log.info( + f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') + if tick_throttle: + n.cancel_scope.cancel() bus._subscribers[symbol].remove(sub) @@ -473,11 +483,6 @@ async def open_feed( ctx.open_stream() as stream, ): - # TODO: can we make this work better with the proposed - # context based bidirectional streaming style api proposed in: - # https://github.com/goodboy/tractor/issues/53 - # init_msg = await stream.receive() - # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], @@ -520,4 +525,8 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - yield feed + try: + yield feed + finally: + # drop the infinite stream connection + await ctx.cancel()