From 49433ea87dfbc01bb8315384a552e8fb9cbd9f6e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 4 Oct 2022 21:52:24 -0400 Subject: [PATCH 01/14] Run dark-clear-loop in daemon task This enables "headless" dark order matching and clearing where an `emsd` daemon subactor can be left running with active dark (or other algorithmic) orders which will still trigger despite to attached-controlling ems-client. Impl details: - rename/add `Router.maybe_open_trade_relays()` which now does all work of starting up ems-side long living clearing and relay tasks and the associated data feed; make is a `Nursery.start()`-able task instead of an `@acm`. - drop `open_brokerd_trades_dialog()` and move/factor contents into the above method. - add support for a `router.client_broadcast('all', msg)` to wholesale fan out a msg to all clients. --- piker/clearing/_ems.py | 505 +++++++++++++++++++---------------------- 1 file changed, 234 insertions(+), 271 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 23b50ddf..c0b06efe 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -23,7 +23,6 @@ from collections import ( defaultdict, # ChainMap, ) -from contextlib import asynccontextmanager from math import isnan from pprint import pformat import time @@ -41,9 +40,12 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, maybe_open_feed +from ..data.feed import ( + Feed, + maybe_open_feed, +) from ..data.types import Struct -from .._daemon import maybe_spawn_brokerd +# from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Order, @@ -135,7 +137,6 @@ class _DarkBook(Struct): float, ] = {} - # _ems_entries: dict[str, str] = {} _active: dict = {} _ems2brokerd_ids: dict[str, str] = bidict() @@ -247,7 +248,6 @@ async def clear_dark_triggers( await brokerd_orders_stream.send(brokerd_msg) - # book._ems_entries[oid] = live_req # book._msgflows[oid].maps.insert(0, live_req) case _: @@ -383,62 +383,174 @@ class Router(Struct): if not stream._closed ) - @asynccontextmanager - async def maybe_open_brokerd_trades_dialogue( + async def maybe_open_trade_relays( self, - feed: Feed, - symbol: str, - dark_book: _DarkBook, + fqsn: str, exec_mode: str, loglevel: str, - ) -> tuple[dict, tractor.MsgStream]: + task_status: TaskStatus[ + tuple[TradesRelay, Feed] + ] = trio.TASK_STATUS_IGNORED, + + ) -> tuple[TradesRelay, Feed]: ''' Open and yield ``brokerd`` trades dialogue context-stream if none already exists. ''' - broker = feed.mod.name - relay: TradesRelay = self.relays.get(broker) + from ..data._source import unpack_fqsn + broker, symbol, suffix = unpack_fqsn(fqsn) - if ( - relay is None - - # We always want to spawn a new relay for the paper engine - # per symbol since we need a new tractor context to be - # opened for every every symbol such that a new data feed - # and ``PaperBoi`` client will be created and then used to - # simulate clearing events. - or exec_mode == 'paper' + async with ( + maybe_open_feed( + [fqsn], + loglevel=loglevel, + ) as (feed, quote_stream), ): + brokermod = feed.mod + broker = brokermod.name - relay = await self.nursery.start( - open_brokerd_trades_dialog, - self, - feed, - symbol, - exec_mode, - loglevel, - ) + # XXX: this should be initial price quote from target provider + first_quote: dict = feed.first_quotes[fqsn] + book: _DarkBook = self.get_dark_book(broker) + book.lasts[fqsn]: float = first_quote['last'] - self.nursery.start_soon( - translate_and_relay_brokerd_events, - broker, - relay.brokerd_stream, - self, - ) + relay: TradesRelay = self.relays.get(broker) + if ( + relay - relay.consumers += 1 + # We always want to spawn a new relay for the paper engine + # per symbol since we need a new tractor context to be + # opened for every every symbol such that a new data feed + # and ``PaperBoi`` client will be created and then used to + # simulate clearing events. + and exec_mode != 'paper' + ): + task_status.started((relay, feed)) + await trio.sleep_forever() + return - # TODO: get updated positions here? - assert relay.brokerd_stream - try: - yield relay - finally: + trades_endpoint = getattr(brokermod, 'trades_dialogue', None) + if ( + trades_endpoint is None + or exec_mode == 'paper' + ): + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running + # a paper-simulator clearing engine. - # TODO: what exactly needs to be torn down here or - # are we just consumer tracking? - relay.consumers -= 1 + # load the paper trading engine + exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + fqsn='.'.join([symbol, broker]), + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = feed.portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + # open trades-dialog endpoint with backend broker + try: + positions: list[BrokerdPosition] + accounts: tuple[str] + + async with ( + open_trades_endpoint as ( + brokerd_ctx, + (positions, accounts,), + ), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + # XXX: really we only want one stream per `emsd` + # actor to relay global `brokerd` order events + # unless we're going to expect each backend to + # relay only orders affiliated with a particular + # ``trades_dialogue()`` session (seems annoying + # for implementers). So, here we cache the relay + # task and instead of running multiple tasks + # (which will result in multiples of the same + # msg being relayed for each EMS client) we just + # register each client stream to this single + # relay loop in the dialog table. + + # begin processing order events from the target + # brokerd backend by receiving order submission + # response messages, normalizing them to EMS + # messages and relaying back to the piker order + # client set. + + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. + pps = {} + for msg in positions: + log.info(f'loading pp: {msg}') + + account = msg['account'] + + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. + assert account in accounts + + pps.setdefault( + (broker, account), + [], + ).append(msg) + + relay = TradesRelay( + brokerd_stream=brokerd_trades_stream, + positions=pps, + accounts=accounts, + consumers=1, + ) + + self.relays[broker] = relay + + # spawn a ``brokerd`` order control dialog stream + # that syncs lifetime with the parent `emsd` daemon. + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) + + # dark book clearing loop, also lives with parent + # daemon to allow dark order clearing while no + # client is connected. + self.nursery.start_soon( + clear_dark_triggers, + self, + relay.brokerd_stream, + quote_stream, + broker, + fqsn, # form: ... + book + ) + + task_status.started((relay, feed)) + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + await trio.sleep_forever() + + finally: + # parent context must have been closed remove from cache so + # next client will respawn if needed + relay = self.relays.pop(broker, None) + if not relay: + log.warning(f'Relay for {broker} was already removed!?') async def client_broadcast( self, @@ -447,7 +559,14 @@ class Router(Struct): ) -> None: to_remove: set[tractor.MsgStream] = set() - subs = self.subscribers[sub_key] + + if sub_key == 'all': + subs = set() + for s in self.subscribers.values(): + subs |= s + else: + subs = self.subscribers[sub_key] + for client_stream in subs: try: await client_stream.send(msg) @@ -467,134 +586,6 @@ class Router(Struct): _router: Router = None -async def open_brokerd_trades_dialog( - - router: Router, - feed: Feed, - symbol: str, - exec_mode: str, - loglevel: str, - - task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, - -) -> tuple[dict, tractor.MsgStream]: - ''' - Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. - - ''' - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - - broker = feed.mod.name - - # TODO: make a `tractor` bug/test for this! - # if only i could member what the problem was.. - # probably some GC of the portal thing? - # portal = feed.portal - - # XXX: we must have our own portal + channel otherwise - # when the data feed closes it may result in a half-closed - # channel that the brokerd side thinks is still open somehow!? - async with maybe_spawn_brokerd( - broker, - loglevel=loglevel, - - ) as portal: - if ( - trades_endpoint is None - or exec_mode == 'paper' - ): - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running - # a paper-simulator clearing engine. - - # load the paper trading engine - exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - fqsn='.'.join([symbol, broker]), - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - try: - positions: list[BrokerdPosition] - accounts: tuple[str] - - async with ( - open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), - brokerd_ctx.open_stream() as brokerd_trades_stream, - ): - # XXX: really we only want one stream per `emsd` actor - # to relay global `brokerd` order events unless we're - # going to expect each backend to relay only orders - # affiliated with a particular ``trades_dialogue()`` - # session (seems annoying for implementers). So, here - # we cache the relay task and instead of running multiple - # tasks (which will result in multiples of the same msg being - # relayed for each EMS client) we just register each client - # stream to this single relay loop in the dialog table. - - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client set. - - # locally cache and track positions per account with - # a table of (brokername, acctid) -> `BrokerdPosition` - # msgs. - pps = {} - for msg in positions: - log.info(f'loading pp: {msg}') - - account = msg['account'] - - # TODO: better value error for this which - # dumps the account and message and states the - # mismatch.. - assert account in accounts - - pps.setdefault( - (broker, account), - [], - ).append(msg) - - relay = TradesRelay( - brokerd_stream=brokerd_trades_stream, - positions=pps, - accounts=accounts, - consumers=1, - ) - - router.relays[broker] = relay - - # the ems scan loop may be cancelled by the client but we - # want to keep the ``brokerd`` dialogue up regardless - - task_status.started(relay) - - # this context should block here indefinitely until - # the ``brokerd`` task either dies or is cancelled - await trio.sleep_forever() - - finally: - # parent context must have been closed remove from cache so - # next client will respawn if needed - relay = router.relays.pop(broker, None) - if not relay: - log.warning(f'Relay for {broker} was already removed!?') - - @tractor.context async def _setup_persistent_emsd( @@ -677,7 +668,7 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(sym, pos_msg) + await router.client_broadcast('all', pos_msg) continue # BrokerdOrderAck @@ -827,7 +818,7 @@ async def translate_and_relay_brokerd_events( # ``.oid`` in the msg since we're planning to # maybe-kinda offer that via using ``Status`` # in the longer run anyway? - log.warning(f'Unkown fill for {fmsg}') + log.warning(f'Unknown fill for {fmsg}') continue # proxy through the "fill" result(s) @@ -1026,7 +1017,6 @@ async def process_client_order_cmds( # acked yet by a brokerd, so register a cancel for when # the order ack does show up later such that the brokerd # order request can be cancelled at that time. - # dark_book._ems_entries[oid] = msg # special case for now.. status.req = to_brokerd_msg @@ -1286,7 +1276,6 @@ async def _emsd_main( from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) - dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, # or a named arg with ctx in it and a type annotation of @@ -1294,108 +1283,82 @@ async def _emsd_main( ems_ctx = ctx # spawn one task per broker feed + relay: TradesRelay feed: Feed - async with ( - maybe_open_feed( - [fqsn], - loglevel=loglevel, - ) as (feed, quote_stream), - ): - # XXX: this should be initial price quote from target provider - first_quote: dict = feed.first_quotes[fqsn] - book: _DarkBook = _router.get_dark_book(broker) - book.lasts[fqsn]: float = first_quote['last'] + # open a stream with the brokerd backend for order flow dialogue + # only open if one isn't already up: we try to keep as few duplicate + # streams as necessary. + # TODO: should we try using `tractor.trionics.maybe_open_context()` + # here? + relay, feed = await _router.nursery.start( + _router.maybe_open_trade_relays, + fqsn, + exec_mode, + loglevel, + ) + brokerd_stream = relay.brokerd_stream + dark_book = _router.get_dark_book(broker) - # open a stream with the brokerd backend for order - # flow dialogue - async with ( + # signal to client that we're started and deliver + # all known pps and accounts for this ``brokerd``. + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + dark_book._active, + )) - # only open if one isn't already up: we try to keep - # as few duplicate streams as necessary - _router.maybe_open_brokerd_trades_dialogue( + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as client_stream: + + # register the client side before starting the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + _router.clients.add(client_stream) + + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) + + # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. + try: + await process_client_order_cmds( + client_stream, + brokerd_stream, + fqsn, feed, - symbol, dark_book, - exec_mode, - loglevel, + _router, + ) - ) as relay, - trio.open_nursery() as n, - ): - - brokerd_stream = relay.brokerd_stream - - # signal to client that we're started and deliver - # all known pps and accounts for this ``brokerd``. - await ems_ctx.started(( - relay.positions, - list(relay.accounts), - book._active, - )) - - # establish 2-way stream with requesting order-client and - # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as client_stream: - - # register the client side before starting the - # brokerd-side relay task to ensure the client is - # delivered all exisiting open orders on startup. - _router.clients.add(client_stream) - - # TODO: instead of by fqsn we need a subscription - # system/schema here to limit what each new client is - # allowed to see in terms of broadcasted order flow - # updates per dialog. - _router.subscribers[fqsn].add(client_stream) - - # trigger scan and exec loop - n.start_soon( - clear_dark_triggers, - _router, - brokerd_stream, - quote_stream, - broker, - fqsn, # form: ... - book + finally: + # try to remove client from "registry" + try: + _router.clients.remove(client_stream) + except KeyError: + log.warning( + f'Stream {client_stream._ctx.chan.uid}' + ' was already dropped?' ) - # start inbound (from attached client) order request processing - # main entrypoint, run here until cancelled. - try: - await process_client_order_cmds( - client_stream, - brokerd_stream, - fqsn, - feed, - dark_book, - _router, - ) + _router.subscribers[fqsn].remove(client_stream) + dialogs = _router.dialogs + for oid, client_streams in dialogs.items(): + if client_stream in client_streams: + client_streams.remove(client_stream) - finally: - # try to remove client from "registry" - try: - _router.clients.remove(client_stream) - except KeyError: + # TODO: for order dialogs left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) + if not client_streams: log.warning( - f'Stream {client_stream._ctx.chan.uid}' - ' was already dropped?' + f'Order dialog is being unmonitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' ) - - _router.subscribers[fqsn].remove(client_stream) - dialogs = _router.dialogs - for oid, client_streams in dialogs.items(): - if client_stream in client_streams: - client_streams.remove(client_stream) - - # TODO: for order dialogs left "alive" in - # the ems this is where we should allow some - # system to take over management. Likely we - # want to allow the user to choose what kind - # of policy to use (eg. cancel all orders - # from client, run some algo, etc.) - if not client_streams: - log.warning( - f'Order dialog is being unmonitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) From d3abfce540e5e2bf8c6dbca38df461cb1c398c33 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 10:55:29 -0400 Subject: [PATCH 02/14] Start notify mod, linux only --- piker/ui/_notify.py | 95 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 piker/ui/_notify.py diff --git a/piker/ui/_notify.py b/piker/ui/_notify.py new file mode 100644 index 00000000..5879200b --- /dev/null +++ b/piker/ui/_notify.py @@ -0,0 +1,95 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Notifications utils. + +""" +import os +import platform +import subprocess +from typing import Optional + +import trio + +from ..log import get_logger +from ..clearing._messages import ( + Status, +) + +log = get_logger(__name__) + + +_dbus_uid: Optional[str] = '' + + +async def notify_from_ems_status_msg( + uuid: str, + msg: Status, + is_subproc: bool = False, + +) -> None: + ''' + Send a linux desktop notification. + + Handle subprocesses by discovering the dbus user id + on first call. + + ''' + if platform.system() != "Linux": + return + + # TODO: this in another task? + # not sure if this will ever be a bottleneck, + # we probably could do graphics stuff first tho? + + if is_subproc: + global _dbus_uid + if not _dbus_uid: + su = os.environ['SUDO_USER'] + + # TODO: use `trio` but we need to use nursery.start() + # to use pipes? + # result = await trio.run_process( + result = subprocess.run( + [ + 'id', + '-u', + su, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # check=True + ) + _dbus_uid = result.stdout.decode("utf-8").replace('\n', '') + + os.environ['DBUS_SESSION_BUS_ADDRESS'] = ( + f'unix:path=/run/user/{_dbus_uid}/bus' + ) + + result = await trio.run_process( + [ + 'notify-send', + '-u', 'normal', + '-t', '1616', + 'piker', + + # TODO: add in standard fill/exec info that maybe we + # pack in a broker independent way? + f"'{msg.resp}: {msg.req.price}'", + ], + ) + log.runtime(result) From b65c02336dd100f20bd995d4de8ce504e66c4f00 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 10:55:47 -0400 Subject: [PATCH 03/14] Don't short circuit relay loop when headless If no clients are connected we now process as normal and try to fire a desktop notification on linux. --- piker/clearing/_ems.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index c0b06efe..1906d1e5 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -44,8 +44,8 @@ from ..data.feed import ( Feed, maybe_open_feed, ) +from ..ui._notify import notify_from_ems_status_msg from ..data.types import Struct -# from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Order, @@ -557,7 +557,7 @@ class Router(Struct): sub_key: str, msg: dict, - ) -> None: + ) -> bool: to_remove: set[tractor.MsgStream] = set() if sub_key == 'all': @@ -567,9 +567,11 @@ class Router(Struct): else: subs = self.subscribers[sub_key] + sent_some: bool = False for client_stream in subs: try: await client_stream.send(msg) + sent_some = True except ( trio.ClosedResourceError, trio.BrokenResourceError, @@ -582,6 +584,8 @@ class Router(Struct): if to_remove: subs.difference_update(to_remove) + return sent_some + _router: Router = None @@ -760,13 +764,8 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_streams = router.get_subs(oid) status_msg = book._active.get(oid) - - if ( - not ems_client_order_streams - or not status_msg - ): + if not status_msg: log.warning( f'Received status for untracked dialog {oid}:\n' f'{fmsg}' @@ -788,10 +787,20 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( + sent_some = await router.client_broadcast( status_msg.req.symbol, status_msg, ) + if not sent_some: + log.info( + 'No clients attached, firing notification for msg:\n' + f'{fmsg}' + ) + await notify_from_ems_status_msg( + oid, + status_msg, + is_subproc=True, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') From 525f805cdbe202afcdf7f9dc9b34e9815e722304 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 10:59:50 -0400 Subject: [PATCH 04/14] Port order mode to new notify routine --- piker/ui/order_mode.py | 45 +++++++++--------------------------------- 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 59b07758..1cabeb10 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -23,7 +23,6 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from functools import partial from pprint import pformat -import platform import time from typing import ( Optional, @@ -64,6 +63,7 @@ from ..clearing._messages import ( BrokerdPosition, ) from ._forms import open_form_input_handling +from ._notify import notify_from_ems_status_msg if TYPE_CHECKING: @@ -530,39 +530,6 @@ class OrderMode: else: log.warn("No line(s) for order {uuid}!?") - async def on_exec( - self, - - uuid: str, - msg: Status, - - ) -> None: - - # DESKTOP NOTIFICATIONS - # - # TODO: this in another task? - # not sure if this will ever be a bottleneck, - # we probably could do graphics stuff first tho? - - # TODO: make this not trash. - # XXX: linux only for now - if platform.system() == "Windows": - return - - result = await trio.run_process( - [ - 'notify-send', - '-u', 'normal', - '-t', '1616', - 'piker', - - # TODO: add in standard fill/exec info that maybe we - # pack in a broker independent way? - f'{msg.resp}: {msg.req.price}', - ], - ) - log.runtime(result) - def on_cancel( self, uuid: str @@ -1064,7 +1031,10 @@ async def process_trade_msg( ) mode.lines.remove_line(uuid=oid) msg.req = req - await mode.on_exec(oid, msg) + await notify_from_ems_status_msg( + uuid, + msg, + ) # response to completed 'dialog' for order request case Status( @@ -1073,7 +1043,10 @@ async def process_trade_msg( req=req, ): msg.req = Order(**req) - await mode.on_exec(oid, msg) + await notify_from_ems_status_msg( + uuid, + msg, + ) mode.lines.remove_line(uuid=oid) # each clearing tick is responded individually From 2fbfe583dd90e52702d012c4af6925e3ab17be89 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 13:08:53 -0400 Subject: [PATCH 05/14] Drop the `Router.clients: set`, `.subscribers` is enough --- piker/clearing/_ems.py | 47 ++++++++++++++---------------------------- 1 file changed, 15 insertions(+), 32 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 1906d1e5..e26c27fb 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -248,8 +248,6 @@ async def clear_dark_triggers( await brokerd_orders_stream.send(brokerd_msg) - # book._msgflows[oid].maps.insert(0, live_req) - case _: raise ValueError(f'Invalid dark book entry: {cmd}') @@ -333,9 +331,6 @@ class Router(Struct): # broker to book map books: dict[str, _DarkBook] = {} - # order id to client stream map - clients: set[tractor.MsgStream] = set() - # sets of clients mapped from subscription keys subscribers: defaultdict[ str, # sub key, default fqsn @@ -577,7 +572,6 @@ class Router(Struct): trio.BrokenResourceError, ): to_remove.add(client_stream) - self.clients.remove(client_stream) log.warning( f'client for {client_stream} was already closed?') @@ -1324,8 +1318,6 @@ async def _emsd_main( # register the client side before starting the # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. - _router.clients.add(client_stream) - # TODO: instead of by fqsn we need a subscription # system/schema here to limit what each new client is # allowed to see in terms of broadcasted order flow @@ -1345,29 +1337,20 @@ async def _emsd_main( ) finally: - # try to remove client from "registry" - try: - _router.clients.remove(client_stream) - except KeyError: - log.warning( - f'Stream {client_stream._ctx.chan.uid}' - ' was already dropped?' - ) - + # try to remove client from subscription registry _router.subscribers[fqsn].remove(client_stream) - dialogs = _router.dialogs - for oid, client_streams in dialogs.items(): - if client_stream in client_streams: - client_streams.remove(client_stream) - # TODO: for order dialogs left "alive" in - # the ems this is where we should allow some - # system to take over management. Likely we - # want to allow the user to choose what kind - # of policy to use (eg. cancel all orders - # from client, run some algo, etc.) - if not client_streams: - log.warning( - f'Order dialog is being unmonitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) + for oid, client_streams in _router.dialogs.items(): + client_streams.discard(client_stream) + + # TODO: for order dialogs left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) + if not client_streams: + log.warning( + f'Order dialog is not being monitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) From 3924c66bd0d2a3b918bca483996da680925742cc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 14:25:47 -0400 Subject: [PATCH 06/14] Move headless notifies into `.client_broadcast()` --- piker/clearing/_ems.py | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index e26c27fb..49f2a115 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -216,6 +216,7 @@ async def clear_dark_triggers( brokerd_msg: Optional[BrokerdOrder] = None match cmd: + # alert: nothing to do but relay a status # back to the requesting ems client case Order(action='alert'): @@ -245,7 +246,6 @@ async def clear_dark_triggers( price=submit_price, size=size, ) - await brokerd_orders_stream.send(brokerd_msg) case _: @@ -551,6 +551,7 @@ class Router(Struct): self, sub_key: str, msg: dict, + notify_on_headless: bool = True, ) -> bool: to_remove: set[tractor.MsgStream] = set() @@ -578,6 +579,18 @@ class Router(Struct): if to_remove: subs.difference_update(to_remove) + if ( + not sent_some + and notify_on_headless + ): + log.info( + 'No clients attached, firing notification for msg:\n' + f'{msg}' + ) + await notify_from_ems_status_msg( + msg, + is_subproc=True, + ) return sent_some @@ -781,20 +794,10 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - sent_some = await router.client_broadcast( + await router.client_broadcast( status_msg.req.symbol, status_msg, ) - if not sent_some: - log.info( - 'No clients attached, firing notification for msg:\n' - f'{fmsg}' - ) - await notify_from_ems_status_msg( - oid, - status_msg, - is_subproc=True, - ) if status == 'closed': log.info(f'Execution for {oid} is complete!') From 26d6e10ad74942742b90b0b9f80e6d7e9509de6b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 14:26:20 -0400 Subject: [PATCH 07/14] Parameterize duration, pprint msg --- piker/data/types.py | 5 ++++- piker/ui/_notify.py | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/piker/data/types.py b/piker/data/types.py index c23f6266..4bdb8063 100644 --- a/piker/data/types.py +++ b/piker/data/types.py @@ -49,10 +49,13 @@ class Struct( hasattr(sys, 'ps1') # TODO: check if we're in pdb ): - return f'Struct({pformat(self.to_dict())})' + return self.pformat() return super().__repr__() + def pformat(self) -> str: + return f'Struct({pformat(self.to_dict())})' + def copy( self, update: Optional[dict] = None, diff --git a/piker/ui/_notify.py b/piker/ui/_notify.py index 5879200b..c3da8d4b 100644 --- a/piker/ui/_notify.py +++ b/piker/ui/_notify.py @@ -37,8 +37,8 @@ _dbus_uid: Optional[str] = '' async def notify_from_ems_status_msg( - uuid: str, msg: Status, + duration: int = 3000, is_subproc: bool = False, ) -> None: @@ -84,12 +84,12 @@ async def notify_from_ems_status_msg( [ 'notify-send', '-u', 'normal', - '-t', '1616', + '-t', f'{duration}', 'piker', # TODO: add in standard fill/exec info that maybe we # pack in a broker independent way? - f"'{msg.resp}: {msg.req.price}'", + f"'{msg.pformat()}'", ], ) log.runtime(result) From c246dcef6f4377a4b89be385a351f3f8a6410a68 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 5 Oct 2022 14:26:55 -0400 Subject: [PATCH 08/14] Drop uuid from notify func inputs --- piker/ui/order_mode.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 1cabeb10..b049aa7c 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -1031,10 +1031,7 @@ async def process_trade_msg( ) mode.lines.remove_line(uuid=oid) msg.req = req - await notify_from_ems_status_msg( - uuid, - msg, - ) + await notify_from_ems_status_msg(msg) # response to completed 'dialog' for order request case Status( @@ -1043,10 +1040,7 @@ async def process_trade_msg( req=req, ): msg.req = Order(**req) - await notify_from_ems_status_msg( - uuid, - msg, - ) + await notify_from_ems_status_msg(msg) mode.lines.remove_line(uuid=oid) # each clearing tick is responded individually From 8cd56cb6d390427c855873a55f58878077703e48 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Oct 2022 08:18:35 -0400 Subject: [PATCH 09/14] Flip ems-side-client (`OrderBook`) to be a struct `@dataclass` is so 2 years ago ;) Also rename `.update()` -> `.send_update()` to be a bit more explicit about actually sending an update msg. --- piker/clearing/_client.py | 12 ++++-------- piker/clearing/_ems.py | 2 ++ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 11eb9b69..03fb62d3 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -19,15 +19,14 @@ Orders and execution client API. """ from contextlib import asynccontextmanager as acm -from typing import Dict from pprint import pformat -from dataclasses import dataclass, field import trio import tractor from tractor.trionics import broadcast_receiver from ..log import get_logger +from ..data.types import Struct from ._ems import _emsd_main from .._daemon import maybe_open_emsd from ._messages import Order, Cancel @@ -37,8 +36,7 @@ from ..brokers import get_brokermod log = get_logger(__name__) -@dataclass -class OrderBook: +class OrderBook(Struct): '''EMS-client-side order book ctl and tracking. A style similar to "model-view" is used here where this api is @@ -53,9 +51,7 @@ class OrderBook: # mem channels used to relay order requests to the EMS daemon _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel - - _sent_orders: Dict[str, Order] = field(default_factory=dict) - _ready_to_receive: trio.Event = trio.Event() + _sent_orders: dict[str, Order] = {} def send( self, @@ -66,7 +62,7 @@ class OrderBook: self._to_ems.send_nowait(msg) return msg - def update( + def send_update( self, uuid: str, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 49f2a115..17b63730 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -679,6 +679,8 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams + # TODO: this should be subscription based for privacy + # eventually! await router.client_broadcast('all', pos_msg) continue From 4c96a4878e45192ba1c4cd50be72890b48b6aa0c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Oct 2022 08:38:19 -0400 Subject: [PATCH 10/14] Process unknown order mode msgs --- piker/clearing/_ems.py | 5 +++++ piker/ui/order_mode.py | 14 ++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 17b63730..99b268be 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -842,6 +842,11 @@ async def translate_and_relay_brokerd_events( status_msg.reqid = reqid status_msg.brokerd_msg = msg + # TODO: if no client is connected (aka we're + # headless) we should record the fill in the + # ``.msg_flow`` chain and re-transmit on client + # connect so that fills can be displayed in a + # chart? await router.client_broadcast( status_msg.req.symbol, status_msg, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index b049aa7c..3cda56ff 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -433,7 +433,7 @@ class OrderMode: size = dialog.order.size # NOTE: sends modified order msg to EMS - self.book.update( + self.book.send_update( uuid=line.dialog.uuid, price=level, size=size, @@ -1047,12 +1047,12 @@ async def process_trade_msg( case Status(resp='fill'): # handle out-of-piker fills reporting? - known_order = book._sent_orders.get(oid) - if not known_order: + order: Order = book._sent_orders.get(oid) + if not order: log.warning(f'order {oid} is unknown') - return + order = msg.req - action = known_order.action + action = order.action details = msg.brokerd_msg # TODO: some kinda progress system @@ -1077,7 +1077,9 @@ async def process_trade_msg( ), ) - # TODO: how should we look this up? + # TODO: append these fill events to the position's clear + # table? + # tracker = mode.trackers[msg['account']] # tracker.live_pp.fills.append(msg) From 1d9ab7b0de2743544bfd0ee617811f4ac8d84669 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Oct 2022 08:38:46 -0400 Subject: [PATCH 11/14] More direct import --- piker/ui/_position.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index c986022a..9ed5b7ec 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -42,7 +42,8 @@ from ._anchors import ( gpath_pin, ) from ..calc import humanize, pnl, puterize -from ..clearing._allocate import Allocator, Position +from ..clearing._allocate import Allocator +from ..pp import Position from ..data._normalize import iterticks from ..data.feed import Feed from ..data.types import Struct From 2bc25e35930a3f810d13d063f94e88a1ef271592 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Oct 2022 12:39:27 -0400 Subject: [PATCH 12/14] Repair already-open order relay, fix causality dilemma With the refactor of the dark loop into a daemon task already-open order relaying from a `brokerd` was broken since no subscribed clients were registered prior to the relay loop sending status msgs for such existing live orders. Repair that by adding one more synchronization phase to the `Router.open_trade_relays()` task: deliver a `client_ready: trio.Event` which is set by the client task once the client stream has been established and don't start the `brokerd` order dialog relay loop until this event is ready. Further implementation deats: - factor the `brokerd` relay caching back into it's own `@acm` method: `maybe_open_brokerd_dialog()` since we do want (but only this) stream singleton-cached per broker backend. - spawn all relay tasks on every entry for the moment until we figure out what we're caching against (any client pre-existing right, which would mean there's an entry in the `.subscribers` table?) - rename `_DarkBook` -> `DarkBook` and `DarkBook.orders` -> `.triggers` --- piker/clearing/_ems.py | 349 ++++++++++++++++++++++------------------- 1 file changed, 190 insertions(+), 159 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 99b268be..6fe815dc 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -23,6 +23,7 @@ from collections import ( defaultdict, # ChainMap, ) +from contextlib import asynccontextmanager as acm from math import isnan from pprint import pformat import time @@ -40,6 +41,10 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks +from ..data._source import ( + unpack_fqsn, + mk_fqsn, +) from ..data.feed import ( Feed, maybe_open_feed, @@ -104,7 +109,7 @@ def mk_check( ) -class _DarkBook(Struct): +class DarkBook(Struct): ''' EMS-trigger execution book. @@ -119,7 +124,7 @@ class _DarkBook(Struct): broker: str # levels which have an executable action (eg. alert, order, signal) - orders: dict[ + triggers: dict[ str, # symbol dict[ str, # uuid @@ -131,14 +136,8 @@ class _DarkBook(Struct): ] ] = {} - # tracks most recent values per symbol each from data feed - lasts: dict[ - str, - float, - ] = {} - - _active: dict = {} - + lasts: dict[str, float] = {} # quote prices + _active: dict[str, Status] = {} # active order dialogs _ems2brokerd_ids: dict[str, str] = bidict() @@ -157,7 +156,7 @@ async def clear_dark_triggers( broker: str, fqsn: str, - book: _DarkBook, + book: DarkBook, ) -> None: ''' @@ -174,7 +173,7 @@ async def clear_dark_triggers( async for quotes in quote_stream: # start = time.time() for sym, quote in quotes.items(): - execs = book.orders.get(sym, {}) + execs = book.triggers.get(sym, {}) for tick in iterticks( quote, # dark order price filter(s) @@ -292,7 +291,7 @@ async def clear_dark_triggers( else: # condition scan loop complete log.debug(f'execs are {execs}') if execs: - book.orders[fqsn] = execs + book.triggers[fqsn] = execs # print(f'execs scan took: {time.time() - start}') @@ -329,7 +328,7 @@ class Router(Struct): nursery: trio.Nursery # broker to book map - books: dict[str, _DarkBook] = {} + books: dict[str, DarkBook] = {} # sets of clients mapped from subscription keys subscribers: defaultdict[ @@ -360,9 +359,9 @@ class Router(Struct): self, brokername: str, - ) -> _DarkBook: + ) -> DarkBook: - return self.books.setdefault(brokername, _DarkBook(brokername)) + return self.books.setdefault(brokername, DarkBook(brokername)) def get_subs( self, @@ -378,7 +377,130 @@ class Router(Struct): if not stream._closed ) - async def maybe_open_trade_relays( + @acm + async def maybe_open_brokerd_dialog( + self, + feed: Feed, + exec_mode: str, + symbol: str, + loglevel: str, + + ) -> None: + brokermod = feed.mod + broker = brokermod.name + relay: TradesRelay = self.relays.get(broker) + if ( + relay + + # We always want to spawn a new relay for the paper + # engine per symbol since we need a new tractor context + # to be opened for every every symbol such that a new + # data feed and ``PaperBoi`` client will be created and + # then used to simulate clearing events. + and exec_mode != 'paper' + ): + # deliver already cached instance + yield relay + return + + trades_endpoint = getattr(brokermod, 'trades_dialogue', None) + if ( + trades_endpoint is None + or exec_mode == 'paper' + ): + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running + # a paper-simulator clearing engine. + + # load the paper trading engine + exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + fqsn='.'.join([symbol, broker]), + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = feed.portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + # open trades-dialog endpoint with backend broker + positions: list[BrokerdPosition] + accounts: tuple[str] + + async with ( + open_trades_endpoint as ( + brokerd_ctx, + (positions, accounts,), + ), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + # XXX: really we only want one stream per `emsd` + # actor to relay global `brokerd` order events + # unless we're going to expect each backend to + # relay only orders affiliated with a particular + # ``trades_dialogue()`` session (seems annoying + # for implementers). So, here we cache the relay + # task and instead of running multiple tasks + # (which will result in multiples of the same + # msg being relayed for each EMS client) we just + # register each client stream to this single + # relay loop in the dialog table. + + # begin processing order events from the target + # brokerd backend by receiving order submission + # response messages, normalizing them to EMS + # messages and relaying back to the piker order + # client set. + + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. + pps = {} + for msg in positions: + log.info(f'loading pp: {msg}') + + account = msg['account'] + + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. + assert account in accounts + + pps.setdefault( + (broker, account), + [], + ).append(msg) + + relay = TradesRelay( + brokerd_stream=brokerd_trades_stream, + positions=pps, + accounts=accounts, + consumers=1, + ) + + self.relays[broker] = relay + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + try: + yield relay + finally: + # parent context must have been closed remove from cache so + # next client will respawn if needed + relay = self.relays.pop(broker, None) + if not relay: + log.warning( + f'Relay for {broker} was already removed!?') + + async def open_trade_relays( self, fqsn: str, exec_mode: str, @@ -408,144 +530,48 @@ class Router(Struct): # XXX: this should be initial price quote from target provider first_quote: dict = feed.first_quotes[fqsn] - book: _DarkBook = self.get_dark_book(broker) + book: DarkBook = self.get_dark_book(broker) book.lasts[fqsn]: float = first_quote['last'] - relay: TradesRelay = self.relays.get(broker) - if ( - relay + async with self.maybe_open_brokerd_dialog( + feed=feed, + exec_mode=exec_mode, + symbol=symbol, + loglevel=loglevel, + ) as relay: + + # dark book clearing loop, also lives with parent + # daemon to allow dark order clearing while no + # client is connected. + self.nursery.start_soon( + clear_dark_triggers, + self, + relay.brokerd_stream, + quote_stream, + broker, + fqsn, # form: ... + book + ) + + client_ready = trio.Event() + task_status.started((relay, feed, client_ready)) + + # sync to the client side by waiting for the stream + # connection setup before relaying any existing live + # orders from the brokerd. + await client_ready.wait() + assert self.subscribers + + # spawn a ``brokerd`` order control dialog stream + # that syncs lifetime with the parent `emsd` daemon. + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) - # We always want to spawn a new relay for the paper engine - # per symbol since we need a new tractor context to be - # opened for every every symbol such that a new data feed - # and ``PaperBoi`` client will be created and then used to - # simulate clearing events. - and exec_mode != 'paper' - ): - task_status.started((relay, feed)) await trio.sleep_forever() - return - - trades_endpoint = getattr(brokermod, 'trades_dialogue', None) - if ( - trades_endpoint is None - or exec_mode == 'paper' - ): - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running - # a paper-simulator clearing engine. - - # load the paper trading engine - exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - fqsn='.'.join([symbol, broker]), - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = feed.portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - # open trades-dialog endpoint with backend broker - try: - positions: list[BrokerdPosition] - accounts: tuple[str] - - async with ( - open_trades_endpoint as ( - brokerd_ctx, - (positions, accounts,), - ), - brokerd_ctx.open_stream() as brokerd_trades_stream, - ): - # XXX: really we only want one stream per `emsd` - # actor to relay global `brokerd` order events - # unless we're going to expect each backend to - # relay only orders affiliated with a particular - # ``trades_dialogue()`` session (seems annoying - # for implementers). So, here we cache the relay - # task and instead of running multiple tasks - # (which will result in multiples of the same - # msg being relayed for each EMS client) we just - # register each client stream to this single - # relay loop in the dialog table. - - # begin processing order events from the target - # brokerd backend by receiving order submission - # response messages, normalizing them to EMS - # messages and relaying back to the piker order - # client set. - - # locally cache and track positions per account with - # a table of (brokername, acctid) -> `BrokerdPosition` - # msgs. - pps = {} - for msg in positions: - log.info(f'loading pp: {msg}') - - account = msg['account'] - - # TODO: better value error for this which - # dumps the account and message and states the - # mismatch.. - assert account in accounts - - pps.setdefault( - (broker, account), - [], - ).append(msg) - - relay = TradesRelay( - brokerd_stream=brokerd_trades_stream, - positions=pps, - accounts=accounts, - consumers=1, - ) - - self.relays[broker] = relay - - # spawn a ``brokerd`` order control dialog stream - # that syncs lifetime with the parent `emsd` daemon. - self.nursery.start_soon( - translate_and_relay_brokerd_events, - broker, - relay.brokerd_stream, - self, - ) - - # dark book clearing loop, also lives with parent - # daemon to allow dark order clearing while no - # client is connected. - self.nursery.start_soon( - clear_dark_triggers, - self, - relay.brokerd_stream, - quote_stream, - broker, - fqsn, # form: ... - book - ) - - task_status.started((relay, feed)) - - # this context should block here indefinitely until - # the ``brokerd`` task either dies or is cancelled - await trio.sleep_forever() - - finally: - # parent context must have been closed remove from cache so - # next client will respawn if needed - relay = self.relays.pop(broker, None) - if not relay: - log.warning(f'Relay for {broker} was already removed!?') async def client_broadcast( self, @@ -584,7 +610,7 @@ class Router(Struct): and notify_on_headless ): log.info( - 'No clients attached, firing notification for msg:\n' + 'No clients attached, firing notification for {sub_key} msg:\n' f'{msg}' ) await notify_from_ems_status_msg( @@ -645,7 +671,7 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' - book: _DarkBook = router.get_dark_book(broker) + book: DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] assert relay.brokerd_stream == brokerd_trades_stream @@ -885,7 +911,11 @@ async def translate_and_relay_brokerd_events( # use backend request id as our ems id though this # may end up with collisions? status_msg = Status(**brokerd_msg) + + # NOTE: be sure to pack an fqsn for the client side! order = Order(**status_msg.req) + order.symbol = mk_fqsn(broker, order.symbol) + assert order.price and order.size status_msg.req = order @@ -961,7 +991,7 @@ async def process_client_order_cmds( fqsn: str, feed: Feed, - dark_book: _DarkBook, + dark_book: DarkBook, router: Router, ) -> None: @@ -1042,7 +1072,7 @@ async def process_client_order_cmds( and status.resp == 'dark_open' ): # remove from dark book clearing - entry = dark_book.orders[fqsn].pop(oid, None) + entry = dark_book.triggers[fqsn].pop(oid, None) if entry: ( pred, @@ -1204,7 +1234,7 @@ async def process_client_order_cmds( # submit execution/order to EMS scan loop # NOTE: this may result in an override of an existing # dark book entry if the order id already exists - dark_book.orders.setdefault( + dark_book.triggers.setdefault( fqsn, {} )[oid] = ( pred, @@ -1287,7 +1317,6 @@ async def _emsd_main( global _router assert _router - from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -1298,14 +1327,15 @@ async def _emsd_main( # spawn one task per broker feed relay: TradesRelay feed: Feed + client_ready: trio.Event # open a stream with the brokerd backend for order flow dialogue # only open if one isn't already up: we try to keep as few duplicate # streams as necessary. # TODO: should we try using `tractor.trionics.maybe_open_context()` # here? - relay, feed = await _router.nursery.start( - _router.maybe_open_trade_relays, + relay, feed, client_ready = await _router.nursery.start( + _router.open_trade_relays, fqsn, exec_mode, loglevel, @@ -1333,6 +1363,7 @@ async def _emsd_main( # allowed to see in terms of broadcasted order flow # updates per dialog. _router.subscribers[fqsn].add(client_stream) + client_ready.set() # start inbound (from attached client) order request processing # main entrypoint, run here until cancelled. From 94f81587abcfbc2ed0d2d25e4c3550d942b2b201 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Oct 2022 17:37:05 -0400 Subject: [PATCH 13/14] Cache EMS trade relay tasks on feed fqsn Except for paper accounts (in which case we need a trades dialog and paper engine per symbol to enable simulated clearing) we can rely on the instrument feed (symbol name) to be the caching key. Utilize `tractor.trionics.maybe_open_context()` and the new key-as-callable support in the paper case to ensure we have separate paper clearing loops per symbol. Requires https://github.com/goodboy/tractor/pull/329 --- piker/clearing/_ems.py | 154 +++++++++++++++++++++++++---------------- 1 file changed, 95 insertions(+), 59 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 6fe815dc..34486a91 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -31,6 +31,7 @@ from typing import ( AsyncIterator, Any, Callable, + Hashable, Optional, ) @@ -1265,6 +1266,24 @@ async def process_client_order_cmds( ) +@acm +async def maybe_open_trade_relays( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + +) -> tuple: + + relay, feed, client_ready = await _router.nursery.start( + _router.open_trade_relays, + fqsn, + exec_mode, + loglevel, + ) + yield relay, feed, client_ready + + @tractor.context async def _emsd_main( ctx: tractor.Context, @@ -1329,69 +1348,86 @@ async def _emsd_main( feed: Feed client_ready: trio.Event - # open a stream with the brokerd backend for order flow dialogue - # only open if one isn't already up: we try to keep as few duplicate - # streams as necessary. - # TODO: should we try using `tractor.trionics.maybe_open_context()` - # here? - relay, feed, client_ready = await _router.nursery.start( - _router.open_trade_relays, - fqsn, - exec_mode, - loglevel, - ) - brokerd_stream = relay.brokerd_stream - dark_book = _router.get_dark_book(broker) + # NOTE: open a stream with the brokerd backend for order flow + # dialogue and dark clearing but only open one: we try to keep as + # few duplicate streams as necessary per ems actor. + def cache_on_fqsn_unless_paper( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ) -> Hashable: + if exec_mode == 'paper': + return f'paper_{fqsn}' + else: + return fqsn - # signal to client that we're started and deliver - # all known pps and accounts for this ``brokerd``. - await ems_ctx.started(( - relay.positions, - list(relay.accounts), - dark_book._active, - )) + async with tractor.trionics.maybe_open_context( + acm_func=maybe_open_trade_relays, + kwargs={ + 'router': _router, + 'fqsn': fqsn, + 'exec_mode': exec_mode, + 'loglevel': loglevel, + }, + key=cache_on_fqsn_unless_paper, - # establish 2-way stream with requesting order-client and - # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as client_stream: + ) as ( + cache_hit, + (relay, feed, client_ready) + ): + brokerd_stream = relay.brokerd_stream + dark_book = _router.get_dark_book(broker) - # register the client side before starting the - # brokerd-side relay task to ensure the client is - # delivered all exisiting open orders on startup. - # TODO: instead of by fqsn we need a subscription - # system/schema here to limit what each new client is - # allowed to see in terms of broadcasted order flow - # updates per dialog. - _router.subscribers[fqsn].add(client_stream) - client_ready.set() + # signal to client that we're started and deliver + # all known pps and accounts for this ``brokerd``. + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + dark_book._active, + )) - # start inbound (from attached client) order request processing - # main entrypoint, run here until cancelled. - try: - await process_client_order_cmds( - client_stream, - brokerd_stream, - fqsn, - feed, - dark_book, - _router, - ) + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as client_stream: - finally: - # try to remove client from subscription registry - _router.subscribers[fqsn].remove(client_stream) + # register the client side before starting the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) + client_ready.set() - for oid, client_streams in _router.dialogs.items(): - client_streams.discard(client_stream) + # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. + try: + await process_client_order_cmds( + client_stream, + brokerd_stream, + fqsn, + feed, + dark_book, + _router, + ) - # TODO: for order dialogs left "alive" in - # the ems this is where we should allow some - # system to take over management. Likely we - # want to allow the user to choose what kind - # of policy to use (eg. cancel all orders - # from client, run some algo, etc.) - if not client_streams: - log.warning( - f'Order dialog is not being monitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) + finally: + # try to remove client from subscription registry + _router.subscribers[fqsn].remove(client_stream) + + for oid, client_streams in _router.dialogs.items(): + client_streams.discard(client_stream) + + # TODO: for order dialogs left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) + if not client_streams: + log.warning( + f'Order dialog is not being monitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) From c437f9370a279d8665eb3c5724f4070ef5347cfb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 7 Oct 2022 13:16:01 -0400 Subject: [PATCH 14/14] Factor out all `maybe_open_context()` guff --- piker/clearing/_ems.py | 80 +++++++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 34486a91..3bada0c3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -1275,13 +1275,48 @@ async def maybe_open_trade_relays( ) -> tuple: - relay, feed, client_ready = await _router.nursery.start( - _router.open_trade_relays, - fqsn, - exec_mode, - loglevel, - ) - yield relay, feed, client_ready + def cache_on_fqsn_unless_paper( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ) -> Hashable: + if exec_mode == 'paper': + return f'paper_{fqsn}' + else: + return fqsn + + # XXX: closure to enable below use of + # ``tractor.trionics.maybe_open_context()`` + @acm + async def cached_mngr( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ): + relay, feed, client_ready = await _router.nursery.start( + _router.open_trade_relays, + fqsn, + exec_mode, + loglevel, + ) + yield relay, feed, client_ready + + async with tractor.trionics.maybe_open_context( + acm_func=cached_mngr, + kwargs={ + 'router': _router, + 'fqsn': fqsn, + 'exec_mode': exec_mode, + 'loglevel': loglevel, + }, + key=cache_on_fqsn_unless_paper, + ) as ( + cache_hit, + (relay, feed, client_ready) + ): + yield relay, feed, client_ready @tractor.context @@ -1351,31 +1386,13 @@ async def _emsd_main( # NOTE: open a stream with the brokerd backend for order flow # dialogue and dark clearing but only open one: we try to keep as # few duplicate streams as necessary per ems actor. - def cache_on_fqsn_unless_paper( - router: Router, - fqsn: str, - exec_mode: str, # ('paper', 'live') - loglevel: str = 'info', - ) -> Hashable: - if exec_mode == 'paper': - return f'paper_{fqsn}' - else: - return fqsn + async with maybe_open_trade_relays( + _router, + fqsn, + exec_mode, + loglevel, + ) as (relay, feed, client_ready): - async with tractor.trionics.maybe_open_context( - acm_func=maybe_open_trade_relays, - kwargs={ - 'router': _router, - 'fqsn': fqsn, - 'exec_mode': exec_mode, - 'loglevel': loglevel, - }, - key=cache_on_fqsn_unless_paper, - - ) as ( - cache_hit, - (relay, feed, client_ready) - ): brokerd_stream = relay.brokerd_stream dark_book = _router.get_dark_book(broker) @@ -1412,7 +1429,6 @@ async def _emsd_main( dark_book, _router, ) - finally: # try to remove client from subscription registry _router.subscribers[fqsn].remove(client_stream)