diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 84983808..74a56917 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -18,7 +18,7 @@ Orders and execution client API. """ -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from typing import Dict from pprint import pformat from dataclasses import dataclass, field @@ -27,7 +27,6 @@ import trio import tractor from tractor.trionics import broadcast_receiver -from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main from .._daemon import maybe_open_emsd @@ -156,16 +155,19 @@ async def relay_order_cmds_from_sync_code( await to_ems_stream.send(cmd) -@asynccontextmanager +@acm async def open_ems( - broker: str, - symbol: Symbol, + fqsn: str, -) -> (OrderBook, tractor.MsgStream, dict): - """Spawn an EMS daemon and begin sending orders and receiving +) -> ( + OrderBook, + tractor.MsgStream, + dict, +): + ''' + Spawn an EMS daemon and begin sending orders and receiving alerts. - This EMS tries to reduce most broker's terrible order entry apis to a very simple protocol built on a few easy to grok and/or "rantsy" premises: @@ -194,21 +196,22 @@ async def open_ems( - 'dark_executed', 'broker_executed' - 'broker_filled' - """ + ''' # wait for service to connect back to us signalling # ready for order commands book = get_orders() + from ..data._source import uncons_fqsn + broker, symbol, suffix = uncons_fqsn(fqsn) + async with maybe_open_emsd(broker) as portal: async with ( - # connect to emsd portal.open_context( _emsd_main, - broker=broker, - symbol=symbol.key, + fqsn=fqsn, ) as (ctx, (positions, accounts)), @@ -218,7 +221,7 @@ async def open_ems( async with trio.open_nursery() as n: n.start_soon( relay_order_cmds_from_sync_code, - symbol.key, + fqsn, trades_stream ) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 630405ea..1ebab7ce 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -20,7 +20,7 @@ In da suit parlances: "Execution management systems" """ from contextlib import asynccontextmanager from dataclasses import dataclass, field -from math import isnan +# from math import isnan from pprint import pformat import time from typing import AsyncIterator, Callable @@ -113,8 +113,8 @@ class _DarkBook: # tracks most recent values per symbol each from data feed lasts: dict[ - tuple[str, str], - float + str, + float, ] = field(default_factory=dict) # mapping of piker ems order ids to current brokerd order flow message @@ -135,7 +135,7 @@ async def clear_dark_triggers( ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, - symbol: str, + fqsn: str, book: _DarkBook, @@ -155,7 +155,6 @@ async def clear_dark_triggers( # start = time.time() for sym, quote in quotes.items(): execs = book.orders.get(sym, {}) - for tick in iterticks( quote, # dark order price filter(s) @@ -171,7 +170,7 @@ async def clear_dark_triggers( ttype = tick['type'] # update to keep new cmds informed - book.lasts[(broker, symbol)] = price + book.lasts[sym] = price for oid, ( pred, @@ -196,6 +195,7 @@ async def clear_dark_triggers( action: str = cmd['action'] symbol: str = cmd['symbol'] + bfqsn: str = symbol.replace(f'.{broker}', '') if action == 'alert': # nothing to do but relay a status @@ -225,7 +225,7 @@ async def clear_dark_triggers( # order-request and instead create a new one. reqid=None, - symbol=sym, + symbol=bfqsn, price=submit_price, size=cmd['size'], ) @@ -247,12 +247,9 @@ async def clear_dark_triggers( oid=oid, # ems order id resp=resp, time_ns=time.time_ns(), - - symbol=symbol, + symbol=fqsn, trigger_price=price, - broker_details={'name': broker}, - cmd=cmd, # original request message ).dict() @@ -270,7 +267,7 @@ async def clear_dark_triggers( else: # condition scan loop complete log.debug(f'execs are {execs}') if execs: - book.orders[symbol] = execs + book.orders[fqsn] = execs # print(f'execs scan took: {time.time() - start}') @@ -382,7 +379,8 @@ async def open_brokerd_trades_dialogue( task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, ) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none + ''' + Open and yield ``brokerd`` trades dialogue context-stream if none already exists. ''' @@ -458,12 +456,13 @@ async def open_brokerd_trades_dialogue( # locally cache and track positions per account. pps = {} for msg in positions: + log.info(f'loading pp: {msg}') account = msg['account'] assert account in accounts pps.setdefault( - msg['symbol'], + f'{msg["symbol"]}.{broker}', {} )[account] = msg @@ -563,7 +562,13 @@ async def translate_and_relay_brokerd_events( # XXX: this will be useful for automatic strats yah? # keep pps per account up to date locally in ``emsd`` mem - relay.positions.setdefault(pos_msg['symbol'], {}).setdefault( + sym, broker = pos_msg['symbol'], pos_msg['broker'] + + relay.positions.setdefault( + # NOTE: translate to a FQSN! + f'{sym}.{broker}', + {} + ).setdefault( pos_msg['account'], {} ).update(pos_msg) @@ -840,11 +845,15 @@ async def process_client_order_cmds( msg = Order(**cmd) - sym = msg.symbol + fqsn = msg.symbol trigger_price = msg.price size = msg.size exec_mode = msg.exec_mode broker = msg.brokers[0] + # remove the broker part before creating a message + # to send to the specific broker since they probably + # aren't expectig their own name, but should they? + sym = fqsn.replace(f'.{broker}', '') if exec_mode == 'live' and action in ('buy', 'sell',): @@ -902,7 +911,7 @@ async def process_client_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. - last = dark_book.lasts[(broker, sym)] + last = dark_book.lasts[fqsn] pred = mk_check(trigger_price, last, action) spread_slap: float = 5 @@ -933,7 +942,7 @@ async def process_client_order_cmds( # dark book entry if the order id already exists dark_book.orders.setdefault( - sym, {} + fqsn, {} )[oid] = ( pred, tickfilter, @@ -960,8 +969,8 @@ async def process_client_order_cmds( async def _emsd_main( ctx: tractor.Context, - broker: str, - symbol: str, + fqsn: str, + _exec_mode: str = 'dark', # ('paper', 'dark', 'live') loglevel: str = 'info', @@ -1003,6 +1012,8 @@ async def _emsd_main( global _router assert _router + from ..data._source import uncons_fqsn + broker, symbol, suffix = uncons_fqsn(fqsn) dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -1015,17 +1026,16 @@ async def _emsd_main( # spawn one task per broker feed async with ( maybe_open_feed( - broker, - [symbol], + [fqsn], loglevel=loglevel, - ) as (feed, stream), + ) as (feed, quote_stream), ): # XXX: this should be initial price quote from target provider - first_quote = feed.first_quotes[symbol] + first_quote = feed.first_quotes[fqsn] book = _router.get_dark_book(broker) - last = book.lasts[(broker, symbol)] = first_quote['last'] + book.lasts[fqsn] = first_quote['last'] # XXX: ib is a cucker but we've fixed avoiding receiving any # `Nan`s in the backend during market hours (right?). this was @@ -1054,8 +1064,8 @@ async def _emsd_main( # flatten out collected pps from brokerd for delivery pp_msgs = { - sym: list(pps.values()) - for sym, pps in relay.positions.items() + fqsn: list(pps.values()) + for fqsn, pps in relay.positions.items() } # signal to client that we're started and deliver @@ -1072,9 +1082,9 @@ async def _emsd_main( brokerd_stream, ems_client_order_stream, - stream, + quote_stream, broker, - symbol, + fqsn, # form: ... book ) @@ -1090,7 +1100,7 @@ async def _emsd_main( # relay.brokerd_dialogue, brokerd_stream, - symbol, + fqsn, feed, dark_book, _router, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 9f4dbadb..26f44007 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -268,13 +268,14 @@ class OrderMode: ''' staged = self._staged_order - symbol = staged.symbol + symbol: Symbol = staged.symbol oid = str(uuid.uuid4()) # format order data for ems + fqsn = symbol.front_fqsn() order = staged.copy( update={ - 'symbol': symbol.key, + 'symbol': fqsn, 'oid': oid, } ) @@ -519,8 +520,7 @@ async def open_order_mode( feed: Feed, chart: 'ChartPlotWidget', # noqa - symbol: Symbol, - brokername: str, + fqsn: str, started: trio.Event, ) -> None: @@ -546,8 +546,7 @@ async def open_order_mode( # spawn EMS actor-service async with ( - - open_ems(brokername, symbol) as ( + open_ems(fqsn) as ( book, trades_stream, position_msgs, @@ -556,8 +555,7 @@ async def open_order_mode( trio.open_nursery() as tn, ): - log.info(f'Opening order mode for {brokername}.{symbol.key}') - + log.info(f'Opening order mode for {fqsn}') view = chart.view # annotations editors @@ -566,7 +564,7 @@ async def open_order_mode( # symbol id symbol = chart.linked.symbol - symkey = symbol.key + symkey = symbol.front_fqsn() # map of per-provider account keys to position tracker instances trackers: dict[str, PositionTracker] = {} @@ -610,7 +608,7 @@ async def open_order_mode( log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') startup_pp.update_from_msg(msg) - # allocator + # allocator config alloc = mk_allocator( symbol=symbol, account=account_name, @@ -818,8 +816,10 @@ async def process_trades_and_update_ui( 'position', ): sym = mode.chart.linked.symbol - if msg['symbol'].lower() in sym.key: + symbol = msg['symbol'].lower() + fqsn = sym.front_fqsn() + if symbol in fqsn: tracker = mode.trackers[msg['account']] tracker.live_pp.update_from_msg(msg) # update order pane widgets