diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 11eb9b69..03fb62d3 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -19,15 +19,14 @@ Orders and execution client API. """ from contextlib import asynccontextmanager as acm -from typing import Dict from pprint import pformat -from dataclasses import dataclass, field import trio import tractor from tractor.trionics import broadcast_receiver from ..log import get_logger +from ..data.types import Struct from ._ems import _emsd_main from .._daemon import maybe_open_emsd from ._messages import Order, Cancel @@ -37,8 +36,7 @@ from ..brokers import get_brokermod log = get_logger(__name__) -@dataclass -class OrderBook: +class OrderBook(Struct): '''EMS-client-side order book ctl and tracking. A style similar to "model-view" is used here where this api is @@ -53,9 +51,7 @@ class OrderBook: # mem channels used to relay order requests to the EMS daemon _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel - - _sent_orders: Dict[str, Order] = field(default_factory=dict) - _ready_to_receive: trio.Event = trio.Event() + _sent_orders: dict[str, Order] = {} def send( self, @@ -66,7 +62,7 @@ class OrderBook: self._to_ems.send_nowait(msg) return msg - def update( + def send_update( self, uuid: str, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 23b50ddf..3bada0c3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -23,7 +23,7 @@ from collections import ( defaultdict, # ChainMap, ) -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from math import isnan from pprint import pformat import time @@ -31,6 +31,7 @@ from typing import ( AsyncIterator, Any, Callable, + Hashable, Optional, ) @@ -41,9 +42,16 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed, maybe_open_feed +from ..data._source import ( + unpack_fqsn, + mk_fqsn, +) +from ..data.feed import ( + Feed, + maybe_open_feed, +) +from ..ui._notify import notify_from_ems_status_msg from ..data.types import Struct -from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Order, @@ -102,7 +110,7 @@ def mk_check( ) -class _DarkBook(Struct): +class DarkBook(Struct): ''' EMS-trigger execution book. @@ -117,7 +125,7 @@ class _DarkBook(Struct): broker: str # levels which have an executable action (eg. alert, order, signal) - orders: dict[ + triggers: dict[ str, # symbol dict[ str, # uuid @@ -129,15 +137,8 @@ class _DarkBook(Struct): ] ] = {} - # tracks most recent values per symbol each from data feed - lasts: dict[ - str, - float, - ] = {} - - # _ems_entries: dict[str, str] = {} - _active: dict = {} - + lasts: dict[str, float] = {} # quote prices + _active: dict[str, Status] = {} # active order dialogs _ems2brokerd_ids: dict[str, str] = bidict() @@ -156,7 +157,7 @@ async def clear_dark_triggers( broker: str, fqsn: str, - book: _DarkBook, + book: DarkBook, ) -> None: ''' @@ -173,7 +174,7 @@ async def clear_dark_triggers( async for quotes in quote_stream: # start = time.time() for sym, quote in quotes.items(): - execs = book.orders.get(sym, {}) + execs = book.triggers.get(sym, {}) for tick in iterticks( quote, # dark order price filter(s) @@ -215,6 +216,7 @@ async def clear_dark_triggers( brokerd_msg: Optional[BrokerdOrder] = None match cmd: + # alert: nothing to do but relay a status # back to the requesting ems client case Order(action='alert'): @@ -244,12 +246,8 @@ async def clear_dark_triggers( price=submit_price, size=size, ) - await brokerd_orders_stream.send(brokerd_msg) - # book._ems_entries[oid] = live_req - # book._msgflows[oid].maps.insert(0, live_req) - case _: raise ValueError(f'Invalid dark book entry: {cmd}') @@ -294,7 +292,7 @@ async def clear_dark_triggers( else: # condition scan loop complete log.debug(f'execs are {execs}') if execs: - book.orders[fqsn] = execs + book.triggers[fqsn] = execs # print(f'execs scan took: {time.time() - start}') @@ -331,10 +329,7 @@ class Router(Struct): nursery: trio.Nursery # broker to book map - books: dict[str, _DarkBook] = {} - - # order id to client stream map - clients: set[tractor.MsgStream] = set() + books: dict[str, DarkBook] = {} # sets of clients mapped from subscription keys subscribers: defaultdict[ @@ -365,9 +360,9 @@ class Router(Struct): self, brokername: str, - ) -> _DarkBook: + ) -> DarkBook: - return self.books.setdefault(brokername, _DarkBook(brokername)) + return self.books.setdefault(brokername, DarkBook(brokername)) def get_subs( self, @@ -383,123 +378,33 @@ class Router(Struct): if not stream._closed ) - @asynccontextmanager - async def maybe_open_brokerd_trades_dialogue( + @acm + async def maybe_open_brokerd_dialog( self, feed: Feed, - symbol: str, - dark_book: _DarkBook, exec_mode: str, + symbol: str, loglevel: str, - ) -> tuple[dict, tractor.MsgStream]: - ''' - Open and yield ``brokerd`` trades dialogue context-stream if - none already exists. - - ''' - broker = feed.mod.name - relay: TradesRelay = self.relays.get(broker) - - if ( - relay is None - - # We always want to spawn a new relay for the paper engine - # per symbol since we need a new tractor context to be - # opened for every every symbol such that a new data feed - # and ``PaperBoi`` client will be created and then used to - # simulate clearing events. - or exec_mode == 'paper' - ): - - relay = await self.nursery.start( - open_brokerd_trades_dialog, - self, - feed, - symbol, - exec_mode, - loglevel, - ) - - self.nursery.start_soon( - translate_and_relay_brokerd_events, - broker, - relay.brokerd_stream, - self, - ) - - relay.consumers += 1 - - # TODO: get updated positions here? - assert relay.brokerd_stream - try: - yield relay - finally: - - # TODO: what exactly needs to be torn down here or - # are we just consumer tracking? - relay.consumers -= 1 - - async def client_broadcast( - self, - sub_key: str, - msg: dict, - ) -> None: - to_remove: set[tractor.MsgStream] = set() - subs = self.subscribers[sub_key] - for client_stream in subs: - try: - await client_stream.send(msg) - except ( - trio.ClosedResourceError, - trio.BrokenResourceError, - ): - to_remove.add(client_stream) - self.clients.remove(client_stream) - log.warning( - f'client for {client_stream} was already closed?') + brokermod = feed.mod + broker = brokermod.name + relay: TradesRelay = self.relays.get(broker) + if ( + relay - if to_remove: - subs.difference_update(to_remove) + # We always want to spawn a new relay for the paper + # engine per symbol since we need a new tractor context + # to be opened for every every symbol such that a new + # data feed and ``PaperBoi`` client will be created and + # then used to simulate clearing events. + and exec_mode != 'paper' + ): + # deliver already cached instance + yield relay + return - -_router: Router = None - - -async def open_brokerd_trades_dialog( - - router: Router, - feed: Feed, - symbol: str, - exec_mode: str, - loglevel: str, - - task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, - -) -> tuple[dict, tractor.MsgStream]: - ''' - Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. - - ''' - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - - broker = feed.mod.name - - # TODO: make a `tractor` bug/test for this! - # if only i could member what the problem was.. - # probably some GC of the portal thing? - # portal = feed.portal - - # XXX: we must have our own portal + channel otherwise - # when the data feed closes it may result in a half-closed - # channel that the brokerd side thinks is still open somehow!? - async with maybe_spawn_brokerd( - broker, - loglevel=loglevel, - - ) as portal: + trades_endpoint = getattr(brokermod, 'trades_dialogue', None) if ( trades_endpoint is None or exec_mode == 'paper' @@ -522,77 +427,201 @@ async def open_brokerd_trades_dialog( else: # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( + open_trades_endpoint = feed.portal.open_context( trades_endpoint, loglevel=loglevel, ) - try: - positions: list[BrokerdPosition] - accounts: tuple[str] + # open trades-dialog endpoint with backend broker + positions: list[BrokerdPosition] + accounts: tuple[str] - async with ( - open_trades_endpoint as (brokerd_ctx, (positions, accounts,)), - brokerd_ctx.open_stream() as brokerd_trades_stream, - ): - # XXX: really we only want one stream per `emsd` actor - # to relay global `brokerd` order events unless we're - # going to expect each backend to relay only orders - # affiliated with a particular ``trades_dialogue()`` - # session (seems annoying for implementers). So, here - # we cache the relay task and instead of running multiple - # tasks (which will result in multiples of the same msg being - # relayed for each EMS client) we just register each client - # stream to this single relay loop in the dialog table. + async with ( + open_trades_endpoint as ( + brokerd_ctx, + (positions, accounts,), + ), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + # XXX: really we only want one stream per `emsd` + # actor to relay global `brokerd` order events + # unless we're going to expect each backend to + # relay only orders affiliated with a particular + # ``trades_dialogue()`` session (seems annoying + # for implementers). So, here we cache the relay + # task and instead of running multiple tasks + # (which will result in multiples of the same + # msg being relayed for each EMS client) we just + # register each client stream to this single + # relay loop in the dialog table. - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client set. + # begin processing order events from the target + # brokerd backend by receiving order submission + # response messages, normalizing them to EMS + # messages and relaying back to the piker order + # client set. - # locally cache and track positions per account with - # a table of (brokername, acctid) -> `BrokerdPosition` - # msgs. - pps = {} - for msg in positions: - log.info(f'loading pp: {msg}') + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. + pps = {} + for msg in positions: + log.info(f'loading pp: {msg}') - account = msg['account'] + account = msg['account'] - # TODO: better value error for this which - # dumps the account and message and states the - # mismatch.. - assert account in accounts + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. + assert account in accounts - pps.setdefault( - (broker, account), - [], - ).append(msg) + pps.setdefault( + (broker, account), + [], + ).append(msg) - relay = TradesRelay( - brokerd_stream=brokerd_trades_stream, - positions=pps, - accounts=accounts, - consumers=1, + relay = TradesRelay( + brokerd_stream=brokerd_trades_stream, + positions=pps, + accounts=accounts, + consumers=1, + ) + + self.relays[broker] = relay + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + try: + yield relay + finally: + # parent context must have been closed remove from cache so + # next client will respawn if needed + relay = self.relays.pop(broker, None) + if not relay: + log.warning( + f'Relay for {broker} was already removed!?') + + async def open_trade_relays( + self, + fqsn: str, + exec_mode: str, + loglevel: str, + + task_status: TaskStatus[ + tuple[TradesRelay, Feed] + ] = trio.TASK_STATUS_IGNORED, + + ) -> tuple[TradesRelay, Feed]: + ''' + Open and yield ``brokerd`` trades dialogue context-stream if + none already exists. + + ''' + from ..data._source import unpack_fqsn + broker, symbol, suffix = unpack_fqsn(fqsn) + + async with ( + maybe_open_feed( + [fqsn], + loglevel=loglevel, + ) as (feed, quote_stream), + ): + brokermod = feed.mod + broker = brokermod.name + + # XXX: this should be initial price quote from target provider + first_quote: dict = feed.first_quotes[fqsn] + book: DarkBook = self.get_dark_book(broker) + book.lasts[fqsn]: float = first_quote['last'] + + async with self.maybe_open_brokerd_dialog( + feed=feed, + exec_mode=exec_mode, + symbol=symbol, + loglevel=loglevel, + ) as relay: + + # dark book clearing loop, also lives with parent + # daemon to allow dark order clearing while no + # client is connected. + self.nursery.start_soon( + clear_dark_triggers, + self, + relay.brokerd_stream, + quote_stream, + broker, + fqsn, # form: ... + book ) - router.relays[broker] = relay + client_ready = trio.Event() + task_status.started((relay, feed, client_ready)) - # the ems scan loop may be cancelled by the client but we - # want to keep the ``brokerd`` dialogue up regardless + # sync to the client side by waiting for the stream + # connection setup before relaying any existing live + # orders from the brokerd. + await client_ready.wait() + assert self.subscribers - task_status.started(relay) + # spawn a ``brokerd`` order control dialog stream + # that syncs lifetime with the parent `emsd` daemon. + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) - # this context should block here indefinitely until - # the ``brokerd`` task either dies or is cancelled await trio.sleep_forever() - finally: - # parent context must have been closed remove from cache so - # next client will respawn if needed - relay = router.relays.pop(broker, None) - if not relay: - log.warning(f'Relay for {broker} was already removed!?') + async def client_broadcast( + self, + sub_key: str, + msg: dict, + notify_on_headless: bool = True, + + ) -> bool: + to_remove: set[tractor.MsgStream] = set() + + if sub_key == 'all': + subs = set() + for s in self.subscribers.values(): + subs |= s + else: + subs = self.subscribers[sub_key] + + sent_some: bool = False + for client_stream in subs: + try: + await client_stream.send(msg) + sent_some = True + except ( + trio.ClosedResourceError, + trio.BrokenResourceError, + ): + to_remove.add(client_stream) + log.warning( + f'client for {client_stream} was already closed?') + + if to_remove: + subs.difference_update(to_remove) + + if ( + not sent_some + and notify_on_headless + ): + log.info( + 'No clients attached, firing notification for {sub_key} msg:\n' + f'{msg}' + ) + await notify_from_ems_status_msg( + msg, + is_subproc=True, + ) + return sent_some + + +_router: Router = None @tractor.context @@ -643,7 +672,7 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' - book: _DarkBook = router.get_dark_book(broker) + book: DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] assert relay.brokerd_stream == brokerd_trades_stream @@ -677,7 +706,9 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(sym, pos_msg) + # TODO: this should be subscription based for privacy + # eventually! + await router.client_broadcast('all', pos_msg) continue # BrokerdOrderAck @@ -769,13 +800,8 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_streams = router.get_subs(oid) status_msg = book._active.get(oid) - - if ( - not ems_client_order_streams - or not status_msg - ): + if not status_msg: log.warning( f'Received status for untracked dialog {oid}:\n' f'{fmsg}' @@ -827,7 +853,7 @@ async def translate_and_relay_brokerd_events( # ``.oid`` in the msg since we're planning to # maybe-kinda offer that via using ``Status`` # in the longer run anyway? - log.warning(f'Unkown fill for {fmsg}') + log.warning(f'Unknown fill for {fmsg}') continue # proxy through the "fill" result(s) @@ -843,6 +869,11 @@ async def translate_and_relay_brokerd_events( status_msg.reqid = reqid status_msg.brokerd_msg = msg + # TODO: if no client is connected (aka we're + # headless) we should record the fill in the + # ``.msg_flow`` chain and re-transmit on client + # connect so that fills can be displayed in a + # chart? await router.client_broadcast( status_msg.req.symbol, status_msg, @@ -881,7 +912,11 @@ async def translate_and_relay_brokerd_events( # use backend request id as our ems id though this # may end up with collisions? status_msg = Status(**brokerd_msg) + + # NOTE: be sure to pack an fqsn for the client side! order = Order(**status_msg.req) + order.symbol = mk_fqsn(broker, order.symbol) + assert order.price and order.size status_msg.req = order @@ -957,7 +992,7 @@ async def process_client_order_cmds( fqsn: str, feed: Feed, - dark_book: _DarkBook, + dark_book: DarkBook, router: Router, ) -> None: @@ -1026,7 +1061,6 @@ async def process_client_order_cmds( # acked yet by a brokerd, so register a cancel for when # the order ack does show up later such that the brokerd # order request can be cancelled at that time. - # dark_book._ems_entries[oid] = msg # special case for now.. status.req = to_brokerd_msg @@ -1039,7 +1073,7 @@ async def process_client_order_cmds( and status.resp == 'dark_open' ): # remove from dark book clearing - entry = dark_book.orders[fqsn].pop(oid, None) + entry = dark_book.triggers[fqsn].pop(oid, None) if entry: ( pred, @@ -1201,7 +1235,7 @@ async def process_client_order_cmds( # submit execution/order to EMS scan loop # NOTE: this may result in an override of an existing # dark book entry if the order id already exists - dark_book.orders.setdefault( + dark_book.triggers.setdefault( fqsn, {} )[oid] = ( pred, @@ -1232,6 +1266,59 @@ async def process_client_order_cmds( ) +@acm +async def maybe_open_trade_relays( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + +) -> tuple: + + def cache_on_fqsn_unless_paper( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ) -> Hashable: + if exec_mode == 'paper': + return f'paper_{fqsn}' + else: + return fqsn + + # XXX: closure to enable below use of + # ``tractor.trionics.maybe_open_context()`` + @acm + async def cached_mngr( + router: Router, + fqsn: str, + exec_mode: str, # ('paper', 'live') + loglevel: str = 'info', + ): + relay, feed, client_ready = await _router.nursery.start( + _router.open_trade_relays, + fqsn, + exec_mode, + loglevel, + ) + yield relay, feed, client_ready + + async with tractor.trionics.maybe_open_context( + acm_func=cached_mngr, + kwargs={ + 'router': _router, + 'fqsn': fqsn, + 'exec_mode': exec_mode, + 'loglevel': loglevel, + }, + key=cache_on_fqsn_unless_paper, + ) as ( + cache_hit, + (relay, feed, client_ready) + ): + yield relay, feed, client_ready + + @tractor.context async def _emsd_main( ctx: tractor.Context, @@ -1284,9 +1371,7 @@ async def _emsd_main( global _router assert _router - from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) - dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, # or a named arg with ctx in it and a type annotation of @@ -1294,108 +1379,71 @@ async def _emsd_main( ems_ctx = ctx # spawn one task per broker feed + relay: TradesRelay feed: Feed - async with ( - maybe_open_feed( - [fqsn], - loglevel=loglevel, - ) as (feed, quote_stream), - ): + client_ready: trio.Event - # XXX: this should be initial price quote from target provider - first_quote: dict = feed.first_quotes[fqsn] - book: _DarkBook = _router.get_dark_book(broker) - book.lasts[fqsn]: float = first_quote['last'] + # NOTE: open a stream with the brokerd backend for order flow + # dialogue and dark clearing but only open one: we try to keep as + # few duplicate streams as necessary per ems actor. + async with maybe_open_trade_relays( + _router, + fqsn, + exec_mode, + loglevel, + ) as (relay, feed, client_ready): - # open a stream with the brokerd backend for order - # flow dialogue - async with ( + brokerd_stream = relay.brokerd_stream + dark_book = _router.get_dark_book(broker) - # only open if one isn't already up: we try to keep - # as few duplicate streams as necessary - _router.maybe_open_brokerd_trades_dialogue( - feed, - symbol, - dark_book, - exec_mode, - loglevel, + # signal to client that we're started and deliver + # all known pps and accounts for this ``brokerd``. + await ems_ctx.started(( + relay.positions, + list(relay.accounts), + dark_book._active, + )) - ) as relay, - trio.open_nursery() as n, - ): + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as client_stream: - brokerd_stream = relay.brokerd_stream + # register the client side before starting the + # brokerd-side relay task to ensure the client is + # delivered all exisiting open orders on startup. + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) + client_ready.set() - # signal to client that we're started and deliver - # all known pps and accounts for this ``brokerd``. - await ems_ctx.started(( - relay.positions, - list(relay.accounts), - book._active, - )) - - # establish 2-way stream with requesting order-client and - # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as client_stream: - - # register the client side before starting the - # brokerd-side relay task to ensure the client is - # delivered all exisiting open orders on startup. - _router.clients.add(client_stream) - - # TODO: instead of by fqsn we need a subscription - # system/schema here to limit what each new client is - # allowed to see in terms of broadcasted order flow - # updates per dialog. - _router.subscribers[fqsn].add(client_stream) - - # trigger scan and exec loop - n.start_soon( - clear_dark_triggers, - _router, + # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. + try: + await process_client_order_cmds( + client_stream, brokerd_stream, - quote_stream, - broker, - fqsn, # form: ... - book + fqsn, + feed, + dark_book, + _router, ) + finally: + # try to remove client from subscription registry + _router.subscribers[fqsn].remove(client_stream) - # start inbound (from attached client) order request processing - # main entrypoint, run here until cancelled. - try: - await process_client_order_cmds( - client_stream, - brokerd_stream, - fqsn, - feed, - dark_book, - _router, - ) + for oid, client_streams in _router.dialogs.items(): + client_streams.discard(client_stream) - finally: - # try to remove client from "registry" - try: - _router.clients.remove(client_stream) - except KeyError: + # TODO: for order dialogs left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.) + if not client_streams: log.warning( - f'Stream {client_stream._ctx.chan.uid}' - ' was already dropped?' + f'Order dialog is not being monitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' ) - - _router.subscribers[fqsn].remove(client_stream) - dialogs = _router.dialogs - for oid, client_streams in dialogs.items(): - if client_stream in client_streams: - client_streams.remove(client_stream) - - # TODO: for order dialogs left "alive" in - # the ems this is where we should allow some - # system to take over management. Likely we - # want to allow the user to choose what kind - # of policy to use (eg. cancel all orders - # from client, run some algo, etc.) - if not client_streams: - log.warning( - f'Order dialog is being unmonitored:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) diff --git a/piker/data/types.py b/piker/data/types.py index c23f6266..4bdb8063 100644 --- a/piker/data/types.py +++ b/piker/data/types.py @@ -49,10 +49,13 @@ class Struct( hasattr(sys, 'ps1') # TODO: check if we're in pdb ): - return f'Struct({pformat(self.to_dict())})' + return self.pformat() return super().__repr__() + def pformat(self) -> str: + return f'Struct({pformat(self.to_dict())})' + def copy( self, update: Optional[dict] = None, diff --git a/piker/ui/_notify.py b/piker/ui/_notify.py new file mode 100644 index 00000000..c3da8d4b --- /dev/null +++ b/piker/ui/_notify.py @@ -0,0 +1,95 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Notifications utils. + +""" +import os +import platform +import subprocess +from typing import Optional + +import trio + +from ..log import get_logger +from ..clearing._messages import ( + Status, +) + +log = get_logger(__name__) + + +_dbus_uid: Optional[str] = '' + + +async def notify_from_ems_status_msg( + msg: Status, + duration: int = 3000, + is_subproc: bool = False, + +) -> None: + ''' + Send a linux desktop notification. + + Handle subprocesses by discovering the dbus user id + on first call. + + ''' + if platform.system() != "Linux": + return + + # TODO: this in another task? + # not sure if this will ever be a bottleneck, + # we probably could do graphics stuff first tho? + + if is_subproc: + global _dbus_uid + if not _dbus_uid: + su = os.environ['SUDO_USER'] + + # TODO: use `trio` but we need to use nursery.start() + # to use pipes? + # result = await trio.run_process( + result = subprocess.run( + [ + 'id', + '-u', + su, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + # check=True + ) + _dbus_uid = result.stdout.decode("utf-8").replace('\n', '') + + os.environ['DBUS_SESSION_BUS_ADDRESS'] = ( + f'unix:path=/run/user/{_dbus_uid}/bus' + ) + + result = await trio.run_process( + [ + 'notify-send', + '-u', 'normal', + '-t', f'{duration}', + 'piker', + + # TODO: add in standard fill/exec info that maybe we + # pack in a broker independent way? + f"'{msg.pformat()}'", + ], + ) + log.runtime(result) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index c986022a..9ed5b7ec 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -42,7 +42,8 @@ from ._anchors import ( gpath_pin, ) from ..calc import humanize, pnl, puterize -from ..clearing._allocate import Allocator, Position +from ..clearing._allocate import Allocator +from ..pp import Position from ..data._normalize import iterticks from ..data.feed import Feed from ..data.types import Struct diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 59b07758..3cda56ff 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -23,7 +23,6 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from functools import partial from pprint import pformat -import platform import time from typing import ( Optional, @@ -64,6 +63,7 @@ from ..clearing._messages import ( BrokerdPosition, ) from ._forms import open_form_input_handling +from ._notify import notify_from_ems_status_msg if TYPE_CHECKING: @@ -433,7 +433,7 @@ class OrderMode: size = dialog.order.size # NOTE: sends modified order msg to EMS - self.book.update( + self.book.send_update( uuid=line.dialog.uuid, price=level, size=size, @@ -530,39 +530,6 @@ class OrderMode: else: log.warn("No line(s) for order {uuid}!?") - async def on_exec( - self, - - uuid: str, - msg: Status, - - ) -> None: - - # DESKTOP NOTIFICATIONS - # - # TODO: this in another task? - # not sure if this will ever be a bottleneck, - # we probably could do graphics stuff first tho? - - # TODO: make this not trash. - # XXX: linux only for now - if platform.system() == "Windows": - return - - result = await trio.run_process( - [ - 'notify-send', - '-u', 'normal', - '-t', '1616', - 'piker', - - # TODO: add in standard fill/exec info that maybe we - # pack in a broker independent way? - f'{msg.resp}: {msg.req.price}', - ], - ) - log.runtime(result) - def on_cancel( self, uuid: str @@ -1064,7 +1031,7 @@ async def process_trade_msg( ) mode.lines.remove_line(uuid=oid) msg.req = req - await mode.on_exec(oid, msg) + await notify_from_ems_status_msg(msg) # response to completed 'dialog' for order request case Status( @@ -1073,19 +1040,19 @@ async def process_trade_msg( req=req, ): msg.req = Order(**req) - await mode.on_exec(oid, msg) + await notify_from_ems_status_msg(msg) mode.lines.remove_line(uuid=oid) # each clearing tick is responded individually case Status(resp='fill'): # handle out-of-piker fills reporting? - known_order = book._sent_orders.get(oid) - if not known_order: + order: Order = book._sent_orders.get(oid) + if not order: log.warning(f'order {oid} is unknown') - return + order = msg.req - action = known_order.action + action = order.action details = msg.brokerd_msg # TODO: some kinda progress system @@ -1110,7 +1077,9 @@ async def process_trade_msg( ), ) - # TODO: how should we look this up? + # TODO: append these fill events to the position's clear + # table? + # tracker = mode.trackers[msg['account']] # tracker.live_pp.fills.append(msg)