From a7e106be96c504991a7398909e0d29dc593db8e4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Jun 2021 07:48:31 -0400 Subject: [PATCH 1/7] Avoid multiple `brokerd` trades dialogue flows This solves a bunch of issues to do with `brokerd` order status msgs getting relayed for each order to **every** correspondingly connected EMS client. Previously we weren't keeping track of which emsd orders were associated with which clients so you had backend msgs getting broadcast to all clients which not only resulted in duplicate (and sometimes erroneous, due to state tracking) actions taking place in the UI's order mode, but it's also just duplicate traffic (usually to the same actor) over multiple logical streams. Instead, only keep up **one** (cached) stream with the `trades_dialogue()` endpoint such that **all** emsd orders route over that single connection to the particular `brokerd` actor. --- piker/clearing/_ems.py | 372 +++++++++++++++++++++++++---------------- 1 file changed, 229 insertions(+), 143 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9b1c3246..b3b4679d 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,9 +18,10 @@ In da suit parlances: "Execution management systems" """ +from contextlib import asynccontextmanager +from dataclasses import dataclass, field from pprint import pformat import time -from dataclasses import dataclass, field from typing import AsyncIterator, Callable, Any from bidict import bidict @@ -31,6 +32,7 @@ import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks +from ..data.feed import Feed from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -254,30 +256,72 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') -# TODO: lots of cases still to handle -# XXX: right now this is very very ad-hoc to IB -# - short-sale but securities haven't been located, in this case we -# should probably keep the order in some kind of weird state or cancel -# it outright? -# status='PendingSubmit', message=''), -# status='Cancelled', message='Error 404, -# reqId 1550: Order held while securities are located.'), -# status='PreSubmitted', message='')], +class _Router(BaseModel): + '''Order router which manages per-broker dark books, alerts, + and clearing related data feed management. + + ''' + nursery: trio.Nursery + + feeds: dict[tuple[str, str], Any] = {} + books: dict[str, _DarkBook] = {} + dialogues: dict[str, list[tractor.MsgStream]] = {} + relays: dict[str, tuple[dict, tractor.MsgStream]] = {} + + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + + def get_dark_book( + self, + brokername: str, + + ) -> _DarkBook: + + return self.books.setdefault(brokername, _DarkBook(brokername)) + + +_router: _Router = None + + +@tractor.context +async def _setup_persistent_emsd( + + ctx: tractor.Context, + +) -> None: + + global _router + + # open a root "service nursery" for the ``emsd`` actor + async with trio.open_nursery() as service_nursery: + + _router = _Router(nursery=service_nursery) + + # TODO: send back the full set of persistent orders/execs persistent + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + async def translate_and_relay_brokerd_events( broker: str, - ems_client_order_stream: tractor.MsgStream, + # ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, book: _DarkBook, + router: _Router, ) -> AsyncIterator[dict]: - """Trades update loop - receive updates from broker, convert - to EMS responses, transmit to ordering client(s). + '''Trades update loop - receive updates from ``brokerd`` trades + endpoint, convert to EMS response msgs, transmit **only** to + ordering client(s). - This is where trade confirmations from the broker are processed - and appropriate responses relayed back to the original EMS client - actor. There is a messaging translation layer throughout. + This is where trade confirmations from the broker are processed and + appropriate responses relayed **only** back to the original EMS + client actor. There is a messaging translation layer throughout. Expected message translation(s): @@ -286,10 +330,10 @@ async def translate_and_relay_brokerd_events( 'status' -> relabel as 'broker_', if complete send 'executed' 'fill' -> 'broker_filled' - Currently accepted status values from IB: + Currently handled status values from IB: {'presubmitted', 'submitted', 'cancelled', 'inactive'} - """ + ''' async for brokerd_msg in brokerd_trades_stream: name = brokerd_msg['name'] @@ -298,10 +342,14 @@ async def translate_and_relay_brokerd_events( if name == 'position': - # relay through position msgs immediately - await ems_client_order_stream.send( - BrokerdPosition(**brokerd_msg).dict() - ) + # relay through position msgs immediately by + # broadcasting updates on all client streams + for oid, ems_client_order_stream in router.dialogues.items(): + + await ems_client_order_stream.send( + BrokerdPosition(**brokerd_msg).dict() + ) + continue # Get the broker (order) request id, this **must** be normalized @@ -331,7 +379,7 @@ async def translate_and_relay_brokerd_events( # may be an order msg specified as "external" to the # piker ems flow (i.e. generated by some other # external broker backend client (like tws for ib) - ext = brokerd_msg.get('external') + ext = brokerd_msg['broker_details'].get('external') if ext: log.error(f"External trade event {ext}") @@ -377,6 +425,7 @@ async def translate_and_relay_brokerd_events( resp = None broker_details = {} + client_flow_complete: bool = False if name in ( 'error', @@ -407,22 +456,13 @@ async def translate_and_relay_brokerd_events( elif name in ( 'status', ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case msg = BrokerdStatus(**brokerd_msg) + if msg.status == 'cancelled': + + client_flow_complete = True + log.info(f'Cancellation for {oid} is complete!') + if msg.status == 'filled': # conditional execution is fully complete, no more @@ -431,6 +471,9 @@ async def translate_and_relay_brokerd_events( resp = 'broker_executed' + # be sure to pop this stream from our dialogue set + # since the order dialogue should be done. + client_flow_complete = True log.info(f'Execution for {oid} is complete!') # just log it @@ -460,6 +503,7 @@ async def translate_and_relay_brokerd_events( # Create and relay response status message # to requesting EMS client + ems_client_order_stream = router.dialogues[oid] await ems_client_order_stream.send( Status( oid=oid, @@ -470,6 +514,9 @@ async def translate_and_relay_brokerd_events( ).dict() ) + if client_flow_complete: + router.dialogues.pop(oid) + async def process_client_order_cmds( @@ -477,8 +524,9 @@ async def process_client_order_cmds( brokerd_order_stream: tractor.MsgStream, symbol: str, - feed: 'Feed', # noqa + feed: Feed, # noqa dark_book: _DarkBook, + router: _Router, ) -> None: @@ -489,6 +537,16 @@ async def process_client_order_cmds( action = cmd['action'] oid = cmd['oid'] + + # register this stream as an active dialogue for this order id + # such that translated message from the brokerd backend can be + # routed (relayed) to **just** that client stream (and in theory + # others who are registered for such order affiliated msgs). + + # TODO: make ``tractor.MsgStream`` a frozen type again such that it + # can be stored in sets like the old context was. + router.dialogues[oid] = client_order_stream + reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) @@ -499,14 +557,17 @@ async def process_client_order_cmds( # check for live-broker order if live_entry: + reqid = reqid or live_entry.reqid + assert reqid + msg = BrokerdCancel( oid=oid, - reqid=reqid or live_entry.reqid, + reqid=reqid, time_ns=time.time_ns(), ) # send cancel to brokerd immediately! - log.info("Submitting cancel for live order") + log.info("Submitting cancel for live order {reqid}") # NOTE: cancel response will be relayed back in messages # from corresponding broker @@ -515,7 +576,8 @@ async def process_client_order_cmds( else: # this might be a cancel for an order that hasn't been # acked yet by a brokerd, so register a cancel for when - # the order ack does show up later + # the order ack does show up later such that the brokerd + # order request can be cancelled at that time. dark_book._ems_entries[oid] = msg # check for EMS active exec @@ -532,6 +594,8 @@ async def process_client_order_cmds( time_ns=time.time_ns(), ).dict() ) + # de-register this client dialogue + router.dialogues.pop(oid) except KeyError: log.exception(f'No dark order for {symbol}?') @@ -581,17 +645,22 @@ async def process_client_order_cmds( log.info(f'Sending live order to {broker}:\n{pformat(msg)}') await brokerd_order_stream.send(msg.dict()) - # an immediate response should be brokerd ack with order - # id but we register our request as part of the flow + # an immediate response should be ``BrokerdOrderAck`` + # with ems order id from the ``trades_dialogue()`` + # endpoint, but we register our request as part of the + # flow so that if a cancel comes from the requesting + # client, before that ack, when the ack does arrive we + # immediately take the reqid from the broker and cancel + # that order with them immediately. dark_book._ems_entries[oid] = msg + # "DARK" triggers + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions elif exec_mode in ('dark', 'paper') or ( action in ('alert') ): - # submit order to local EMS book and scan loop, - # effectively a local clearing engine, which - # scans for conditions and triggers matching executions - # Auto-gen scanner predicate: # we automatically figure out what the alert check # condition should be based on the current first @@ -637,11 +706,11 @@ async def process_client_order_cmds( percent_away, abs_diff_away ) + resp = 'dark_submitted' + # alerts have special msgs to distinguish if action == 'alert': resp = 'alert_submitted' - else: - resp = 'dark_submitted' await client_order_stream.send( Status( @@ -652,6 +721,97 @@ async def process_client_order_cmds( ) +@asynccontextmanager +async def maybe_open_brokerd_trades_dialogue( + + router: _Router, + feed: Feed, + broker: str, + symbol: str, + dark_book: _DarkBook, + _exec_mode: str, + loglevel: str, + +) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + portal = feed._brokerd_portal + + if broker in _router.relays: + + positions, brokerd_trades_stream = _router.relays[broker] + + # TODO: get updated positions here? + yield positions, brokerd_trades_stream + return + + if trades_endpoint is None or _exec_mode == 'paper': + + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + async with ( + + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + trio.open_nursery() as n, + + ): + # XXX: really we only want one stream per `emsd` actor + # to relay global `brokerd` order events unless we're + # doing to expect each backend to relay only orders + # affiliated with a particular ``trades_dialogue()`` + # session (seems annoying for implementers). So, here + # we cache the relay task and instead of running multiple + # tasks (which will result in multiples of the same msg being + # relayed for each EMS client) we just register each client + # stream to this single relay loop using _router.dialogues + + # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client set. + + n.start_soon( + + translate_and_relay_brokerd_events, + + broker, + # ems_client_order_stream, + brokerd_trades_stream, + dark_book, + _router, + ) + + _router.relays[broker] = (positions, brokerd_trades_stream) + + yield positions, brokerd_trades_stream + + @tractor.context async def _emsd_main( @@ -697,6 +857,8 @@ async def _emsd_main( ''' global _router + assert _router + dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -711,8 +873,6 @@ async def _emsd_main( # spawn one task per broker feed async with ( - trio.open_nursery() as n, - # TODO: eventually support N-brokers data.open_feed( broker, @@ -732,39 +892,24 @@ async def _emsd_main( book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - portal = feed._brokerd_portal - - if trades_endpoint is None or _exec_mode == 'paper': - - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running a - # paper-simulator clearing engine. - - # load the paper trading engine - _exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - async with ( - open_trades_endpoint as (brokerd_ctx, positions), - brokerd_ctx.open_stream() as brokerd_trades_stream, + + # only open if one isn't already up: we try to keep + # as few duplicate streams as necessary + maybe_open_brokerd_trades_dialogue( + _router, + feed, + broker, + symbol, + dark_book, + _exec_mode, + loglevel, + + ) as (positions, brokerd_trades_stream), + + trio.open_nursery() as n, ): + # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? @@ -787,72 +932,13 @@ async def _emsd_main( book ) - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client. - n.start_soon( - translate_and_relay_brokerd_events, - - broker, - ems_client_order_stream, - brokerd_trades_stream, - dark_book, - ) - # start inbound (from attached client) order request processing await process_client_order_cmds( + ems_client_order_stream, brokerd_trades_stream, symbol, feed, dark_book, + _router, ) - - -class _Router(BaseModel): - '''Order router which manages per-broker dark books, alerts, - and clearing related data feed management. - - ''' - nursery: trio.Nursery - - feeds: dict[tuple[str, str], Any] = {} - books: dict[str, _DarkBook] = {} - - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False - - def get_dark_book( - self, - brokername: str, - - ) -> _DarkBook: - - return self.books.setdefault(brokername, _DarkBook(brokername)) - - -_router: _Router = None - - -@tractor.context -async def _setup_persistent_emsd( - - ctx: tractor.Context, - -) -> None: - - global _router - - # open a root "service nursery" for the ``emsd`` actor - async with trio.open_nursery() as service_nursery: - - _router = _Router(nursery=service_nursery) - - # TODO: send back the full set of persistent orders/execs persistent - await ctx.started() - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() From 8f05254c80f7a963a30f1afa7487f527a12818db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Jun 2021 10:57:08 -0400 Subject: [PATCH 2/7] Better live order handling logic --- piker/clearing/_ems.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index b3b4679d..54497a5d 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -556,9 +556,7 @@ async def process_client_order_cmds( # check for live-broker order if live_entry: - - reqid = reqid or live_entry.reqid - assert reqid + reqid = live_entry.reqid msg = BrokerdCancel( oid=oid, @@ -566,21 +564,24 @@ async def process_client_order_cmds( time_ns=time.time_ns(), ) - # send cancel to brokerd immediately! - log.info("Submitting cancel for live order {reqid}") - # NOTE: cancel response will be relayed back in messages # from corresponding broker - await brokerd_order_stream.send(msg.dict()) + if reqid: + # send cancel to brokerd immediately! + log.info("Submitting cancel for live order {reqid}") + + await brokerd_order_stream.send(msg.dict()) + + else: + # this might be a cancel for an order that hasn't been + # acked yet by a brokerd, so register a cancel for when + # the order ack does show up later such that the brokerd + # order request can be cancelled at that time. + dark_book._ems_entries[oid] = msg + + # dark trigger cancel else: - # this might be a cancel for an order that hasn't been - # acked yet by a brokerd, so register a cancel for when - # the order ack does show up later such that the brokerd - # order request can be cancelled at that time. - dark_book._ems_entries[oid] = msg - - # check for EMS active exec try: # remove from dark book clearing dark_book.orders[symbol].pop(oid, None) From 003fa6254f2da94b7d5fbf6af9bc670b2d4f091e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Jun 2021 13:19:38 -0400 Subject: [PATCH 3/7] Don't forget to pop the brokerd dialogue on teardown.. --- piker/clearing/_ems.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 54497a5d..5bdf34d3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -514,8 +514,11 @@ async def translate_and_relay_brokerd_events( ).dict() ) - if client_flow_complete: - router.dialogues.pop(oid) + # TODO: do we want this to keep things cleaned up? + # it might require a special status from brokerd to affirm the + # flow is complete? + # if client_flow_complete: + # router.dialogues.pop(oid) async def process_client_order_cmds( @@ -810,7 +813,12 @@ async def maybe_open_brokerd_trades_dialogue( _router.relays[broker] = (positions, brokerd_trades_stream) - yield positions, brokerd_trades_stream + try: + yield positions, brokerd_trades_stream + + finally: + # remove from cache so next client will respawn if needed + _router.relays.pop(broker) @tractor.context From 8b6fb83257688b3c9bc4b4cdc4d08f2ae2914c56 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jun 2021 00:53:32 -0400 Subject: [PATCH 4/7] Pop subscriber streams on connection errors --- piker/data/_sampling.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index a1d615f5..793417e9 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -248,12 +248,15 @@ async def sample_and_broadcast( # so far seems like no since this should all # be single-threaded. log.error(f'{stream._ctx.chan.uid} dropped connection') + subs.remove((stream, tick_throttle)) async def uniform_rate_send( + rate: float, quote_stream: trio.abc.ReceiveChannel, stream: tractor.MsgStream, + ) -> None: sleep_period = 1/rate - 0.000616 @@ -289,8 +292,14 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul - await stream.send({first_quote['symbol']: first_quote}) - break + try: + await stream.send({first_quote['symbol']: first_quote}) + break + except trio.ClosedResourceError: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning(f'{stream} closed') + return end = time.time() diff = end - start From 9f897ba75c371aad91f1d1f04c4d5fc0e9019591 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Jun 2021 00:57:58 -0400 Subject: [PATCH 5/7] Support multiple client trade flows over one `brokerd~ There is no reason to have more then `brokerd` trades dialogue stream open per `emsd`. Here we minimize to managing that lone stream and multiplexing msgs from each client such that multiple clients can be connected to the ems, conducting trading without requiring multiple ems-client connections to the backend broker and without the broker being aware there are even multiple flows going on. This patch also sets up for being able to have ems clients which register to receive and track trade flows from other piker clients thus enabling so called "multi-player" trading where orders for both paper and live trades can be shared between multiple participants in the form of a pre-broker, local clearing service and trade signals dark book. --- piker/clearing/_ems.py | 423 ++++++++++++++++++++++++++--------------- 1 file changed, 270 insertions(+), 153 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5bdf34d3..2cd6a1b9 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any from bidict import bidict from pydantic import BaseModel import trio +from trio_typing import TaskStatus import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks from ..data.feed import Feed +from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -82,15 +84,16 @@ def mk_check( @dataclass class _DarkBook: - """Client-side execution book. + '''EMS-trigger execution book. - Contains conditions for executions (aka "orders") which are not - exposed to brokers and thus the market; i.e. these are privacy - focussed "client side" orders. + Contains conditions for executions (aka "orders" or "triggers") + which are not exposed to brokers and thus the market; i.e. these are + privacy focussed "client side" orders which are submitted in real-time + based on specified trigger conditions. - A singleton instance is created per EMS actor (for now). + A an instance per `brokerd` is created per EMS actor (for now). - """ + ''' broker: str # levels which have an executable action (eg. alert, order, signal) @@ -256,17 +259,34 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') +@dataclass +class TradesRelay: + brokerd_dialogue: tractor.MsgStream + positions: dict[str, float] + consumers: int = 0 + + class _Router(BaseModel): - '''Order router which manages per-broker dark books, alerts, - and clearing related data feed management. + '''Order router which manages and tracks per-broker dark book, + alerts, clearing and related data feed management. + + A singleton per ``emsd`` actor. ''' + # setup at actor spawn time nursery: trio.Nursery feeds: dict[tuple[str, str], Any] = {} + + # broker to book map books: dict[str, _DarkBook] = {} + + # order id to client stream map + clients: set[tractor.MsgStream] = set() dialogues: dict[str, list[tractor.MsgStream]] = {} - relays: dict[str, tuple[dict, tractor.MsgStream]] = {} + + # brokername to trades-dialogues streams with ``brokerd`` actors + relays: dict[str, TradesRelay] = {} class Config: arbitrary_types_allowed = True @@ -280,10 +300,160 @@ class _Router(BaseModel): return self.books.setdefault(brokername, _DarkBook(brokername)) + @asynccontextmanager + async def maybe_open_brokerd_trades_dialogue( + + self, + feed: Feed, + symbol: str, + dark_book: _DarkBook, + _exec_mode: str, + loglevel: str, + + ) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + relay = self.relays.get(feed.mod.name) + + if relay is None: + + relay = await self.nursery.start( + open_brokerd_trades_dialogue, + self, + feed, + symbol, + _exec_mode, + loglevel, + ) + + relay.consumers += 1 + + # TODO: get updated positions here? + assert relay.brokerd_dialogue + try: + yield relay + + finally: + + # TODO: what exactly needs to be torn down here or + # are we just consumer tracking? + + relay.consumers -= 1 + _router: _Router = None +async def open_brokerd_trades_dialogue( + + router: _Router, + feed: Feed, + symbol: str, + _exec_mode: str, + loglevel: str, + + task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, + +) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + + broker = feed.mod.name + + # TODO: make a `tractor` bug about this! + # portal = feed._brokerd_portal + + # XXX: we must have our own portal + channel otherwise + # when the data feed closes it may result in a half-closed/fucked + # channel that the brokerd side thinks is still open somehow!? + async with maybe_spawn_brokerd( + + broker, + loglevel=loglevel, + + ) as portal: + + if trades_endpoint is None or _exec_mode == 'paper': + + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + try: + async with ( + + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + + ): + # XXX: really we only want one stream per `emsd` actor + # to relay global `brokerd` order events unless we're + # doing to expect each backend to relay only orders + # affiliated with a particular ``trades_dialogue()`` + # session (seems annoying for implementers). So, here + # we cache the relay task and instead of running multiple + # tasks (which will result in multiples of the same msg being + # relayed for each EMS client) we just register each client + # stream to this single relay loop using _router.dialogues + + # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client set. + + relay = TradesRelay( + brokerd_dialogue=brokerd_trades_stream, + positions=positions, + consumers=1 + ) + + _router.relays[broker] = relay + + # the ems scan loop may be cancelled by the client but we + # want to keep the ``brokerd`` dialogue up regardless + + task_status.started(relay) + + await translate_and_relay_brokerd_events( + broker, + brokerd_trades_stream, + _router, + ) + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + + finally: + # parent context must have been closed + # remove from cache so next client will respawn if needed + _router.relays.pop(broker) + + @tractor.context async def _setup_persistent_emsd( @@ -298,20 +468,18 @@ async def _setup_persistent_emsd( _router = _Router(nursery=service_nursery) - # TODO: send back the full set of persistent orders/execs persistent + # TODO: send back the full set of persistent + # orders/execs? await ctx.started() - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down + # allow service tasks to run until cancelled await trio.sleep_forever() async def translate_and_relay_brokerd_events( broker: str, - # ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, - book: _DarkBook, router: _Router, ) -> AsyncIterator[dict]: @@ -334,6 +502,11 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' + book = router.get_dark_book(broker) + relay = router.relays[broker] + + assert relay.brokerd_dialogue == brokerd_trades_stream + async for brokerd_msg in brokerd_trades_stream: name = brokerd_msg['name'] @@ -342,13 +515,15 @@ async def translate_and_relay_brokerd_events( if name == 'position': + pos_msg = BrokerdPosition(**brokerd_msg).dict() + + # keep up to date locally in ``emsd`` + relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg) + # relay through position msgs immediately by # broadcasting updates on all client streams - for oid, ems_client_order_stream in router.dialogues.items(): - - await ems_client_order_stream.send( - BrokerdPosition(**brokerd_msg).dict() - ) + for client_stream in router.clients: + await client_stream.send(pos_msg) continue @@ -425,7 +600,7 @@ async def translate_and_relay_brokerd_events( resp = None broker_details = {} - client_flow_complete: bool = False + # client_flow_complete: bool = False if name in ( 'error', @@ -460,7 +635,7 @@ async def translate_and_relay_brokerd_events( if msg.status == 'cancelled': - client_flow_complete = True + # client_flow_complete = True log.info(f'Cancellation for {oid} is complete!') if msg.status == 'filled': @@ -473,7 +648,7 @@ async def translate_and_relay_brokerd_events( # be sure to pop this stream from our dialogue set # since the order dialogue should be done. - client_flow_complete = True + # client_flow_complete = True log.info(f'Execution for {oid} is complete!') # just log it @@ -503,22 +678,26 @@ async def translate_and_relay_brokerd_events( # Create and relay response status message # to requesting EMS client - ems_client_order_stream = router.dialogues[oid] - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=broker_details, - ).dict() - ) + try: + ems_client_order_stream = router.dialogues[oid] + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ).dict() + ) + except KeyError: + log.error( + f'Received `brokerd` msg for unknown client with oid: {oid}') - # TODO: do we want this to keep things cleaned up? - # it might require a special status from brokerd to affirm the - # flow is complete? - # if client_flow_complete: - # router.dialogues.pop(oid) + # TODO: do we want this to keep things cleaned up? + # it might require a special status from brokerd to affirm the + # flow is complete? + # if client_flow_complete: + # router.dialogues.pop(oid) async def process_client_order_cmds( @@ -533,6 +712,8 @@ async def process_client_order_cmds( ) -> None: + client_dialogues = router.dialogues + # cmd: dict async for cmd in client_order_stream: @@ -541,14 +722,16 @@ async def process_client_order_cmds( action = cmd['action'] oid = cmd['oid'] + # TODO: make ``tractor.MsgStream`` a frozen type again such that it + # can be stored in sets like the old context was. + # wait, maybe this **is** already working thanks to our parent + # `trio` type? + # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). - - # TODO: make ``tractor.MsgStream`` a frozen type again such that it - # can be stored in sets like the old context was. - router.dialogues[oid] = client_order_stream + client_dialogues[oid] = client_order_stream reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) @@ -655,7 +838,7 @@ async def process_client_order_cmds( # flow so that if a cancel comes from the requesting # client, before that ack, when the ack does arrive we # immediately take the reqid from the broker and cancel - # that order with them immediately. + # that live order asap. dark_book._ems_entries[oid] = msg # "DARK" triggers @@ -725,102 +908,6 @@ async def process_client_order_cmds( ) -@asynccontextmanager -async def maybe_open_brokerd_trades_dialogue( - - router: _Router, - feed: Feed, - broker: str, - symbol: str, - dark_book: _DarkBook, - _exec_mode: str, - loglevel: str, - -) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. - - ''' - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - portal = feed._brokerd_portal - - if broker in _router.relays: - - positions, brokerd_trades_stream = _router.relays[broker] - - # TODO: get updated positions here? - yield positions, brokerd_trades_stream - return - - if trades_endpoint is None or _exec_mode == 'paper': - - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running a - # paper-simulator clearing engine. - - # load the paper trading engine - _exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - async with ( - - open_trades_endpoint as (brokerd_ctx, positions), - brokerd_ctx.open_stream() as brokerd_trades_stream, - trio.open_nursery() as n, - - ): - # XXX: really we only want one stream per `emsd` actor - # to relay global `brokerd` order events unless we're - # doing to expect each backend to relay only orders - # affiliated with a particular ``trades_dialogue()`` - # session (seems annoying for implementers). So, here - # we cache the relay task and instead of running multiple - # tasks (which will result in multiples of the same msg being - # relayed for each EMS client) we just register each client - # stream to this single relay loop using _router.dialogues - - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client set. - - n.start_soon( - - translate_and_relay_brokerd_events, - - broker, - # ems_client_order_stream, - brokerd_trades_stream, - dark_book, - _router, - ) - - _router.relays[broker] = (positions, brokerd_trades_stream) - - try: - yield positions, brokerd_trades_stream - - finally: - # remove from cache so next client will respawn if needed - _router.relays.pop(broker) - - @tractor.context async def _emsd_main( @@ -855,7 +942,7 @@ async def _emsd_main( run (dark order) conditions on inputs and trigger brokerd "live" order submissions. | - - ``translate_and_relay_brokerd_events()``: + - (maybe) ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and relay to ems client(s); this is a effectively a "trade event reponse" proxy-broker. @@ -905,24 +992,24 @@ async def _emsd_main( # only open if one isn't already up: we try to keep # as few duplicate streams as necessary - maybe_open_brokerd_trades_dialogue( - _router, + _router.maybe_open_brokerd_trades_dialogue( feed, - broker, symbol, dark_book, _exec_mode, loglevel, - ) as (positions, brokerd_trades_stream), + ) as relay, trio.open_nursery() as n, ): + brokerd_stream = relay.brokerd_dialogue # .clone() + # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? - await ems_ctx.started(positions) + await ems_ctx.started(relay.positions) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates @@ -932,7 +1019,8 @@ async def _emsd_main( n.start_soon( clear_dark_triggers, - brokerd_trades_stream, + # relay.brokerd_dialogue, + brokerd_stream, ems_client_order_stream, feed.stream, @@ -942,12 +1030,41 @@ async def _emsd_main( ) # start inbound (from attached client) order request processing - await process_client_order_cmds( + try: + _router.clients.add(ems_client_order_stream) - ems_client_order_stream, - brokerd_trades_stream, - symbol, - feed, - dark_book, - _router, - ) + await process_client_order_cmds( + + ems_client_order_stream, + + # relay.brokerd_dialogue, + brokerd_stream, + + symbol, + feed, + dark_book, + _router, + ) + + finally: + # remove client from "registry" + _router.clients.remove(ems_client_order_stream) + + dialogues = _router.dialogues + + for oid, client_stream in dialogues.items(): + + if client_stream == ems_client_order_stream: + + log.warning( + f'client dialogue is being abandoned:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) + dialogues.pop(oid) + + # TODO: for order dialogues left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.). From f4a998655b4d247673362a98a8a6cf2581037c04 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 5 Jul 2021 09:41:35 -0400 Subject: [PATCH 6/7] Feed detach must explicitly unsub throttled streams If a client attaches to a quotes data feed and requests a throttle rate, be sure to unsub that side-band memchan + task when it detaches and especially so on any transport connection error. Also, use an explicit `tractor.Context.cancel()` on the client feed block exit since we removed the implicit cancel option from the `tractor` api. --- piker/data/_sampling.py | 32 ++++++++++++++++++-------------- piker/data/feed.py | 21 +++++++++++++++------ 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 793417e9..490ae4b0 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -233,22 +233,26 @@ async def sample_and_broadcast( for (stream, tick_throttle) in subs: - if tick_throttle: - await stream.send(quote) + try: + if tick_throttle: + await stream.send(quote) - else: - try: + else: await stream.send({sym: quote}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - # XXX: do we need to deregister here - # if it's done in the fee bus code? - # so far seems like no since this should all - # be single-threaded. - log.error(f'{stream._ctx.chan.uid} dropped connection') - subs.remove((stream, tick_throttle)) + + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + # XXX: do we need to deregister here + # if it's done in the fee bus code? + # so far seems like no since this should all + # be single-threaded. + log.warning( + f'{stream._ctx.chan.uid} dropped ' + '`brokerd`-quotes-feed connection' + ) + subs.remove((stream, tick_throttle)) async def uniform_rate_send( diff --git a/piker/data/feed.py b/piker/data/feed.py index 477e7bac..ed24a095 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -305,6 +305,11 @@ async def attach_feed_bus( ): if tick_throttle: + + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. + send, recv = trio.open_memory_channel(2**10) n.start_soon( uniform_rate_send, @@ -321,7 +326,12 @@ async def attach_feed_bus( try: await trio.sleep_forever() + finally: + log.info( + f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') + if tick_throttle: + n.cancel_scope.cancel() bus._subscribers[symbol].remove(sub) @@ -473,11 +483,6 @@ async def open_feed( ctx.open_stream() as stream, ): - # TODO: can we make this work better with the proposed - # context based bidirectional streaming style api proposed in: - # https://github.com/goodboy/tractor/issues/53 - # init_msg = await stream.receive() - # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], @@ -520,4 +525,8 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - yield feed + try: + yield feed + finally: + # drop the infinite stream connection + await ctx.cancel() From 91907bf5ef896f521aaa428e077cf47eeb99eb65 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jul 2021 10:17:31 -0400 Subject: [PATCH 7/7] Drop old dialogues pop flag, the client does it on teardown --- piker/clearing/_ems.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 2cd6a1b9..7dbbfbad 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -91,7 +91,7 @@ class _DarkBook: privacy focussed "client side" orders which are submitted in real-time based on specified trigger conditions. - A an instance per `brokerd` is created per EMS actor (for now). + An instance per `brokerd` is created per EMS actor (for now). ''' broker: str @@ -365,11 +365,11 @@ async def open_brokerd_trades_dialogue( broker = feed.mod.name - # TODO: make a `tractor` bug about this! + # TODO: make a `tractor` bug/test for this! # portal = feed._brokerd_portal # XXX: we must have our own portal + channel otherwise - # when the data feed closes it may result in a half-closed/fucked + # when the data feed closes it may result in a half-closed # channel that the brokerd side thinks is still open somehow!? async with maybe_spawn_brokerd( @@ -600,7 +600,6 @@ async def translate_and_relay_brokerd_events( resp = None broker_details = {} - # client_flow_complete: bool = False if name in ( 'error', @@ -635,7 +634,6 @@ async def translate_and_relay_brokerd_events( if msg.status == 'cancelled': - # client_flow_complete = True log.info(f'Cancellation for {oid} is complete!') if msg.status == 'filled': @@ -648,7 +646,6 @@ async def translate_and_relay_brokerd_events( # be sure to pop this stream from our dialogue set # since the order dialogue should be done. - # client_flow_complete = True log.info(f'Execution for {oid} is complete!') # just log it @@ -696,8 +693,7 @@ async def translate_and_relay_brokerd_events( # TODO: do we want this to keep things cleaned up? # it might require a special status from brokerd to affirm the # flow is complete? - # if client_flow_complete: - # router.dialogues.pop(oid) + # router.dialogues.pop(oid) async def process_client_order_cmds( @@ -1067,4 +1063,4 @@ async def _emsd_main( # system to take over management. Likely we # want to allow the user to choose what kind # of policy to use (eg. cancel all orders - # from client, run some algo, etc.). + # from client, run some algo, etc.)