diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index ee176f87..01196f41 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -27,7 +27,7 @@ import trio import tractor from tractor.trionics import broadcast_receiver -from ..accounting._mktinfo import unpack_fqsn +from ..accounting._mktinfo import unpack_fqme from ..log import get_logger from ..data.types import Struct from ..service import maybe_open_emsd @@ -177,7 +177,7 @@ async def relay_order_cmds_from_sync_code( @acm async def open_ems( - fqsn: str, + fqme: str, mode: str = 'live', loglevel: str = 'error', @@ -229,7 +229,7 @@ async def open_ems( # ready for order commands book = get_orders() - broker, symbol, suffix = unpack_fqsn(fqsn) + broker, symbol, suffix = unpack_fqme(fqme) async with maybe_open_emsd(broker) as portal: @@ -246,7 +246,7 @@ async def open_ems( portal.open_context( _emsd_main, - fqsn=fqsn, + fqme=fqme, exec_mode=mode, loglevel=loglevel, @@ -266,7 +266,7 @@ async def open_ems( async with trio.open_nursery() as n: n.start_soon( relay_order_cmds_from_sync_code, - fqsn, + fqme, trades_stream ) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 4a735a4e..24d491c5 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -44,7 +44,7 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks from ..accounting._mktinfo import ( - unpack_fqsn, + unpack_fqme, float_digits, ) from ..data.feed import ( @@ -156,7 +156,7 @@ async def clear_dark_triggers( brokerd_orders_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, - fqsn: str, + fqme: str, book: DarkBook, @@ -231,7 +231,7 @@ async def clear_dark_triggers( account=account, size=size, ): - bfqsn: str = symbol.replace(f'.{broker}', '') + bfqme: str = symbol.replace(f'.{broker}', '') submit_price = price + abs_diff_away resp = 'triggered' # hidden on client-side @@ -244,7 +244,7 @@ async def clear_dark_triggers( oid=oid, account=account, time_ns=time.time_ns(), - symbol=bfqsn, + symbol=bfqme, price=submit_price, size=size, ) @@ -287,14 +287,14 @@ async def clear_dark_triggers( # send response to client-side await router.client_broadcast( - fqsn, + fqme, status, ) else: # condition scan loop complete log.debug(f'execs are {execs}') if execs: - book.triggers[fqsn] = execs + book.triggers[fqme] = execs # print(f'execs scan took: {time.time() - start}') @@ -335,7 +335,7 @@ class Router(Struct): # sets of clients mapped from subscription keys subscribers: defaultdict[ - str, # sub key, default fqsn + str, # sub key, default fqme set[tractor.MsgStream], # unique client streams ] = defaultdict(set) @@ -424,7 +424,7 @@ class Router(Struct): # actor to simulate the real IPC load it'll have when also # pulling data from feeds open_trades_endpoint = paper.open_paperboi( - fqsn='.'.join([symbol, broker]), + fqme='.'.join([symbol, broker]), loglevel=loglevel, ) @@ -506,7 +506,7 @@ class Router(Struct): async def open_trade_relays( self, - fqsn: str, + fqme: str, exec_mode: str, loglevel: str, @@ -520,24 +520,24 @@ class Router(Struct): none already exists. ''' - broker, symbol, suffix = unpack_fqsn(fqsn) + broker, symbol, suffix = unpack_fqme(fqme) async with ( maybe_open_feed( - [fqsn], + [fqme], loglevel=loglevel, ) as feed, ): - brokername, _, _ = unpack_fqsn(fqsn) + brokername, _, _ = unpack_fqme(fqme) brokermod = feed.mods[brokername] broker = brokermod.name portal = feed.portals[brokermod] # XXX: this should be initial price quote from target provider - flume = feed.flumes[fqsn] + flume = feed.flumes[fqme] first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) - book.lasts[fqsn]: float = first_quote['last'] + book.lasts[fqme]: float = first_quote['last'] async with self.maybe_open_brokerd_dialog( brokermod=brokermod, @@ -556,7 +556,7 @@ class Router(Struct): relay.brokerd_stream, flume.stream, broker, - fqsn, # form: ... + fqme, # form: ... book ) @@ -945,7 +945,7 @@ async def translate_and_relay_brokerd_events( # may end up with collisions? status_msg = Status(**brokerd_msg) - # NOTE: be sure to pack an fqsn for the client side! + # NOTE: be sure to pack an fqme for the client side! order = Order(**status_msg.req) order.symbol = f'{order.symbol}.{broker}' @@ -1022,7 +1022,7 @@ async def process_client_order_cmds( client_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream, - fqsn: str, + fqme: str, flume: Flume, dark_book: DarkBook, router: Router, @@ -1049,11 +1049,11 @@ async def process_client_order_cmds( # backend can be routed and relayed to subscribed clients. subs = router.dialogs[oid] - # add all subscribed clients for this fqsn (should eventually be + # add all subscribed clients for this fqme (should eventually be # a more generalize subscription system) to received order msg # updates (and thus show stuff in the UI). subs.add(client_order_stream) - subs.update(router.subscribers[fqsn]) + subs.update(router.subscribers[fqme]) reqid = dark_book._ems2brokerd_ids.inverse.get(oid) @@ -1111,7 +1111,7 @@ async def process_client_order_cmds( and status.resp == 'dark_open' ): # remove from dark book clearing - entry = dark_book.triggers[fqsn].pop(oid, None) + entry = dark_book.triggers[fqme].pop(oid, None) if entry: ( pred, @@ -1127,7 +1127,7 @@ async def process_client_order_cmds( status.req = cmd await router.client_broadcast( - fqsn, + fqme, status, ) @@ -1137,7 +1137,7 @@ async def process_client_order_cmds( dark_book._active.pop(oid) else: - log.exception(f'No dark order for {fqsn}?') + log.exception(f'No dark order for {fqme}?') # TODO: eventually we should be receiving # this struct on the wire unpacked in a scoped protocol @@ -1146,7 +1146,7 @@ async def process_client_order_cmds( # LIVE order REQUEST case { 'oid': oid, - 'symbol': fqsn, + 'symbol': fqme, 'price': trigger_price, 'size': size, 'action': ('buy' | 'sell') as action, @@ -1159,7 +1159,7 @@ async def process_client_order_cmds( # remove the broker part before creating a message # to send to the specific broker since they probably # aren't expectig their own name, but should they? - sym = fqsn.replace(f'.{broker}', '') + sym = fqme.replace(f'.{broker}', '') if status is not None: # if we already had a broker order id then @@ -1216,7 +1216,7 @@ async def process_client_order_cmds( # DARK-order / alert REQUEST case { 'oid': oid, - 'symbol': fqsn, + 'symbol': fqme, 'price': trigger_price, 'size': size, 'exec_mode': exec_mode, @@ -1238,7 +1238,7 @@ async def process_client_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. - last = dark_book.lasts[fqsn] + last = dark_book.lasts[fqme] # sometimes the real-time feed hasn't come up # so just pull from the latest history. @@ -1280,7 +1280,7 @@ async def process_client_order_cmds( # NOTE: this may result in an override of an existing # dark book entry if the order id already exists dark_book.triggers.setdefault( - fqsn, {} + fqme, {} )[oid] = ( pred, tickfilter, @@ -1305,7 +1305,7 @@ async def process_client_order_cmds( # broadcast status to all subscribed clients await router.client_broadcast( - fqsn, + fqme, status, ) @@ -1316,35 +1316,36 @@ async def process_client_order_cmds( @acm async def maybe_open_trade_relays( router: Router, - fqsn: str, + fqme: str, exec_mode: str, # ('paper', 'live') loglevel: str = 'info', ) -> tuple: - def cache_on_fqsn_unless_paper( + def cache_on_fqme_unless_paper( router: Router, - fqsn: str, + fqme: str, exec_mode: str, # ('paper', 'live') loglevel: str = 'info', ) -> Hashable: if exec_mode == 'paper': - return f'paper_{fqsn}' + return f'paper_{fqme}' else: - return fqsn + return fqme # XXX: closure to enable below use of # ``tractor.trionics.maybe_open_context()`` @acm async def cached_mngr( router: Router, - fqsn: str, + fqme: str, exec_mode: str, # ('paper', 'live') loglevel: str = 'info', ): + relay, feed, client_ready = await _router.nursery.start( _router.open_trade_relays, - fqsn, + fqme, exec_mode, loglevel, ) @@ -1354,11 +1355,11 @@ async def maybe_open_trade_relays( acm_func=cached_mngr, kwargs={ 'router': _router, - 'fqsn': fqsn, + 'fqme': fqme, 'exec_mode': exec_mode, 'loglevel': loglevel, }, - key=cache_on_fqsn_unless_paper, + key=cache_on_fqme_unless_paper, ) as ( cache_hit, (relay, feed, client_ready) @@ -1369,7 +1370,7 @@ async def maybe_open_trade_relays( @tractor.context async def _emsd_main( ctx: tractor.Context, - fqsn: str, + fqme: str, exec_mode: str, # ('paper', 'live') loglevel: str = 'info', @@ -1426,7 +1427,7 @@ async def _emsd_main( global _router assert _router - broker, symbol, suffix = unpack_fqsn(fqsn) + broker, symbol, suffix = unpack_fqme(fqme) # TODO: would be nice if in tractor we can require either a ctx arg, # or a named arg with ctx in it and a type annotation of @@ -1443,7 +1444,7 @@ async def _emsd_main( # few duplicate streams as necessary per ems actor. async with maybe_open_trade_relays( _router, - fqsn, + fqme, exec_mode, loglevel, ) as (relay, feed, client_ready): @@ -1466,28 +1467,28 @@ async def _emsd_main( # register the client side before starting the # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. - # TODO: instead of by fqsn we need a subscription + # TODO: instead of by fqme we need a subscription # system/schema here to limit what each new client is # allowed to see in terms of broadcasted order flow # updates per dialog. - _router.subscribers[fqsn].add(client_stream) + _router.subscribers[fqme].add(client_stream) client_ready.set() # start inbound (from attached client) order request processing # main entrypoint, run here until cancelled. try: - flume = feed.flumes[fqsn] + flume = feed.flumes[fqme] await process_client_order_cmds( client_stream, brokerd_stream, - fqsn, + fqme, flume, dark_book, _router, ) finally: # try to remove client from subscription registry - _router.subscribers[fqsn].remove(client_stream) + _router.subscribers[fqme].remove(client_stream) for oid, client_streams in _router.dialogs.items(): client_streams.discard(client_stream)