diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index f4732e54..5034aca6 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -18,8 +18,11 @@ Binance backend """ -from contextlib import asynccontextmanager -from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator +from contextlib import asynccontextmanager as acm +from typing import ( + Any, Union, Optional, + AsyncGenerator, Callable, +) import time import trio @@ -88,7 +91,7 @@ class Pair(BaseModel): baseCommissionPrecision: int quoteCommissionPrecision: int - orderTypes: List[str] + orderTypes: list[str] icebergAllowed: bool ocoAllowed: bool @@ -96,8 +99,8 @@ class Pair(BaseModel): isSpotTradingAllowed: bool isMarginTradingAllowed: bool - filters: List[Dict[str, Union[str, int, float]]] - permissions: List[str] + filters: list[dict[str, Union[str, int, float]]] + permissions: list[str] @dataclass @@ -145,7 +148,7 @@ class Client: self, method: str, params: dict, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: resp = await self._sesh.get( path=f'/api/v3/{method}', params=params, @@ -200,7 +203,7 @@ class Client: self, pattern: str, limit: int = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: if self._pairs is not None: data = self._pairs else: @@ -273,7 +276,7 @@ class Client: return array -@asynccontextmanager +@acm async def get_client() -> Client: client = Client() await client.cache_symbols() @@ -353,7 +356,7 @@ async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: } -def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: +def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: """Create a request subscription packet dict. https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams @@ -368,6 +371,17 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: } +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('binance') as client: + yield client + + async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa @@ -385,12 +399,12 @@ async def backfill_bars( async def stream_quotes( send_chan: trio.abc.SendChannel, - symbols: List[str], + symbols: list[str], feed_is_live: trio.Event, loglevel: str = None, # startup sync - task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: # XXX: required to propagate ``tractor`` loglevel to piker logging @@ -427,10 +441,11 @@ async def stream_quotes( symbol: { 'symbol_info': sym_infos[sym], 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, }, } - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection): # setup subs @@ -480,8 +495,7 @@ async def stream_quotes( # TODO: use ``anext()`` when it lands in 3.10! typ, quote = await msg_gen.__anext__() - first_quote = {quote['symbol'].lower(): quote} - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, quote)) # signal to caller feed is ready for consumption feed_is_live.set() diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index ab440ffe..3431dfd6 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1472,6 +1472,7 @@ async def stream_quotes( return init_msgs init_msgs = mk_init_msgs() + con = first_ticker.contract # should be real volume for this contract by default @@ -1496,8 +1497,11 @@ async def stream_quotes( topic = '.'.join((con['symbol'], suffix)).lower() quote['symbol'] = topic + # for compat with upcoming fqsn based derivs search + init_msgs[sym]['fqsn'] = topic + # pass first quote asap - first_quote = {topic: quote} + first_quote = quote # it might be outside regular trading hours so see if we can at # least grab history. diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index f64ef7aa..4f5166db 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -18,9 +18,9 @@ Kraken backend. ''' -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from dataclasses import asdict, field -from typing import Dict, List, Tuple, Any, Optional, AsyncIterator +from typing import Any, Optional, AsyncIterator, Callable import time from trio_typing import TaskStatus @@ -80,7 +80,7 @@ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = True -_symbol_info_translation: Dict[str, str] = { +_symbol_info_translation: dict[str, str] = { 'tick_decimals': 'pair_decimals', } @@ -102,16 +102,16 @@ class Pair(BaseModel): lot_multiplier: float # array of leverage amounts available when buying - leverage_buy: List[int] + leverage_buy: list[int] # array of leverage amounts available when selling - leverage_sell: List[int] + leverage_sell: list[int] # fee schedule array in [volume, percent fee] tuples - fees: List[Tuple[int, float]] + fees: list[tuple[int, float]] # maker fee schedule array in [volume, percent fee] tuples (if on # maker/taker) - fees_maker: List[Tuple[int, float]] + fees_maker: list[tuple[int, float]] fee_volume_currency: str # volume discount currency margin_call: str # margin call level @@ -153,7 +153,7 @@ class OHLC: volume: float # Accumulated volume **within interval** count: int # Number of trades within interval # (sampled) generated tick data - ticks: List[Any] = field(default_factory=list) + ticks: list[Any] = field(default_factory=list) def get_config() -> dict[str, Any]: @@ -177,7 +177,7 @@ def get_config() -> dict[str, Any]: def get_kraken_signature( urlpath: str, - data: Dict[str, Any], + data: dict[str, Any], secret: str ) -> str: postdata = urllib.parse.urlencode(data) @@ -220,7 +220,7 @@ class Client: self._secret = secret @property - def pairs(self) -> Dict[str, Any]: + def pairs(self) -> dict[str, Any]: if self._pairs is None: raise RuntimeError( "Make sure to run `cache_symbols()` on startup!" @@ -233,7 +233,7 @@ class Client: self, method: str, data: dict, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: resp = await self._sesh.post( path=f'/public/{method}', json=data, @@ -246,7 +246,7 @@ class Client: method: str, data: dict, uri_path: str - ) -> Dict[str, Any]: + ) -> dict[str, Any]: headers = { 'Content-Type': 'application/x-www-form-urlencoded', @@ -266,16 +266,16 @@ class Client: async def endpoint( self, method: str, - data: Dict[str, Any] - ) -> Dict[str, Any]: + data: dict[str, Any] + ) -> dict[str, Any]: uri_path = f'/0/private/{method}' data['nonce'] = str(int(1000*time.time())) return await self._private(method, data, uri_path) async def get_trades( self, - data: Dict[str, Any] = {} - ) -> Dict[str, Any]: + data: dict[str, Any] = {} + ) -> dict[str, Any]: data['ofs'] = 0 # Grab all trade history # https://docs.kraken.com/rest/#operation/getTradeHistory @@ -378,7 +378,7 @@ class Client: self, pattern: str, limit: int = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: if self._pairs is not None: data = self._pairs else: @@ -452,7 +452,7 @@ class Client: raise SymbolNotFound(json['error'][0] + f': {symbol}') -@asynccontextmanager +@acm async def get_client() -> Client: section = get_config() @@ -521,7 +521,7 @@ def normalize_symbol( return ticker.lower() -def make_auth_sub(data: Dict[str, Any]) -> Dict[str, str]: +def make_auth_sub(data: dict[str, Any]) -> dict[str, str]: ''' Create a request subscription packet dict. @@ -696,12 +696,12 @@ async def handle_order_requests( async def trades_dialogue( ctx: tractor.Context, loglevel: str = None, -) -> AsyncIterator[Dict[str, Any]]: +) -> AsyncIterator[dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection, token: str): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe @@ -980,7 +980,7 @@ def normalize( return topic, quote -def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: +def make_sub(pairs: list[str], data: dict[str, Any]) -> dict[str, str]: ''' Create a request subscription packet dict. @@ -996,6 +996,17 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + # TODO implement history getter for the new storage layer. + async with open_cached_client('kraken') as client: + yield client + + async def backfill_bars( sym: str, @@ -1017,7 +1028,7 @@ async def backfill_bars( async def stream_quotes( send_chan: trio.abc.SendChannel, - symbols: List[str], + symbols: list[str], feed_is_live: trio.Event, loglevel: str = None, @@ -1025,7 +1036,7 @@ async def stream_quotes( sub_type: str = 'ohlc', # startup sync - task_status: TaskStatus[Tuple[Dict, Dict]] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -1064,10 +1075,11 @@ async def stream_quotes( symbol: { 'symbol_info': sym_infos[sym], 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym, }, } - @asynccontextmanager + @acm async def subscribe(ws: wsproto.WSConnection): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe @@ -1121,8 +1133,7 @@ async def stream_quotes( topic, quote = normalize(ohlc_last) - first_quote = {topic: quote} - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, quote)) # lol, only "closes" when they're margin squeezing clients ;P feed_is_live.set() diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 84983808..837c28bc 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -18,7 +18,7 @@ Orders and execution client API. """ -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from typing import Dict from pprint import pformat from dataclasses import dataclass, field @@ -27,7 +27,6 @@ import trio import tractor from tractor.trionics import broadcast_receiver -from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main from .._daemon import maybe_open_emsd @@ -156,16 +155,19 @@ async def relay_order_cmds_from_sync_code( await to_ems_stream.send(cmd) -@asynccontextmanager +@acm async def open_ems( - broker: str, - symbol: Symbol, + fqsn: str, -) -> (OrderBook, tractor.MsgStream, dict): - """Spawn an EMS daemon and begin sending orders and receiving +) -> ( + OrderBook, + tractor.MsgStream, + dict, +): + ''' + Spawn an EMS daemon and begin sending orders and receiving alerts. - This EMS tries to reduce most broker's terrible order entry apis to a very simple protocol built on a few easy to grok and/or "rantsy" premises: @@ -194,21 +196,22 @@ async def open_ems( - 'dark_executed', 'broker_executed' - 'broker_filled' - """ + ''' # wait for service to connect back to us signalling # ready for order commands book = get_orders() + from ..data._source import unpack_fqsn + broker, symbol, suffix = unpack_fqsn(fqsn) + async with maybe_open_emsd(broker) as portal: async with ( - # connect to emsd portal.open_context( _emsd_main, - broker=broker, - symbol=symbol.key, + fqsn=fqsn, ) as (ctx, (positions, accounts)), @@ -218,7 +221,7 @@ async def open_ems( async with trio.open_nursery() as n: n.start_soon( relay_order_cmds_from_sync_code, - symbol.key, + fqsn, trades_stream ) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 630405ea..c49ff4bf 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -20,7 +20,6 @@ In da suit parlances: "Execution management systems" """ from contextlib import asynccontextmanager from dataclasses import dataclass, field -from math import isnan from pprint import pformat import time from typing import AsyncIterator, Callable @@ -113,8 +112,8 @@ class _DarkBook: # tracks most recent values per symbol each from data feed lasts: dict[ - tuple[str, str], - float + str, + float, ] = field(default_factory=dict) # mapping of piker ems order ids to current brokerd order flow message @@ -135,7 +134,7 @@ async def clear_dark_triggers( ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, - symbol: str, + fqsn: str, book: _DarkBook, @@ -155,7 +154,6 @@ async def clear_dark_triggers( # start = time.time() for sym, quote in quotes.items(): execs = book.orders.get(sym, {}) - for tick in iterticks( quote, # dark order price filter(s) @@ -171,7 +169,7 @@ async def clear_dark_triggers( ttype = tick['type'] # update to keep new cmds informed - book.lasts[(broker, symbol)] = price + book.lasts[sym] = price for oid, ( pred, @@ -196,6 +194,7 @@ async def clear_dark_triggers( action: str = cmd['action'] symbol: str = cmd['symbol'] + bfqsn: str = symbol.replace(f'.{broker}', '') if action == 'alert': # nothing to do but relay a status @@ -225,7 +224,7 @@ async def clear_dark_triggers( # order-request and instead create a new one. reqid=None, - symbol=sym, + symbol=bfqsn, price=submit_price, size=cmd['size'], ) @@ -247,12 +246,9 @@ async def clear_dark_triggers( oid=oid, # ems order id resp=resp, time_ns=time.time_ns(), - - symbol=symbol, + symbol=fqsn, trigger_price=price, - broker_details={'name': broker}, - cmd=cmd, # original request message ).dict() @@ -270,7 +266,7 @@ async def clear_dark_triggers( else: # condition scan loop complete log.debug(f'execs are {execs}') if execs: - book.orders[symbol] = execs + book.orders[fqsn] = execs # print(f'execs scan took: {time.time() - start}') @@ -382,7 +378,8 @@ async def open_brokerd_trades_dialogue( task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, ) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none + ''' + Open and yield ``brokerd`` trades dialogue context-stream if none already exists. ''' @@ -419,8 +416,7 @@ async def open_brokerd_trades_dialogue( # actor to simulate the real IPC load it'll have when also # pulling data from feeds open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, + fqsn='.'.join([symbol, broker]), loglevel=loglevel, ) @@ -458,12 +454,13 @@ async def open_brokerd_trades_dialogue( # locally cache and track positions per account. pps = {} for msg in positions: + log.info(f'loading pp: {msg}') account = msg['account'] assert account in accounts pps.setdefault( - msg['symbol'], + f'{msg["symbol"]}.{broker}', {} )[account] = msg @@ -493,8 +490,9 @@ async def open_brokerd_trades_dialogue( finally: # parent context must have been closed # remove from cache so next client will respawn if needed - ## TODO: Maybe add a warning - _router.relays.pop(broker, None) + relay = _router.relays.pop(broker, None) + if not relay: + log.warning(f'Relay for {broker} was already removed!?') @tractor.context @@ -563,7 +561,13 @@ async def translate_and_relay_brokerd_events( # XXX: this will be useful for automatic strats yah? # keep pps per account up to date locally in ``emsd`` mem - relay.positions.setdefault(pos_msg['symbol'], {}).setdefault( + sym, broker = pos_msg['symbol'], pos_msg['broker'] + + relay.positions.setdefault( + # NOTE: translate to a FQSN! + f'{sym}.{broker}', + {} + ).setdefault( pos_msg['account'], {} ).update(pos_msg) @@ -840,11 +844,15 @@ async def process_client_order_cmds( msg = Order(**cmd) - sym = msg.symbol + fqsn = msg.symbol trigger_price = msg.price size = msg.size exec_mode = msg.exec_mode broker = msg.brokers[0] + # remove the broker part before creating a message + # to send to the specific broker since they probably + # aren't expectig their own name, but should they? + sym = fqsn.replace(f'.{broker}', '') if exec_mode == 'live' and action in ('buy', 'sell',): @@ -902,7 +910,7 @@ async def process_client_order_cmds( # price received from the feed, instead of being # like every other shitty tina platform that makes # the user choose the predicate operator. - last = dark_book.lasts[(broker, sym)] + last = dark_book.lasts[fqsn] pred = mk_check(trigger_price, last, action) spread_slap: float = 5 @@ -933,7 +941,7 @@ async def process_client_order_cmds( # dark book entry if the order id already exists dark_book.orders.setdefault( - sym, {} + fqsn, {} )[oid] = ( pred, tickfilter, @@ -960,8 +968,8 @@ async def process_client_order_cmds( async def _emsd_main( ctx: tractor.Context, - broker: str, - symbol: str, + fqsn: str, + _exec_mode: str = 'dark', # ('paper', 'dark', 'live') loglevel: str = 'info', @@ -1003,6 +1011,8 @@ async def _emsd_main( global _router assert _router + from ..data._source import unpack_fqsn + broker, symbol, suffix = unpack_fqsn(fqsn) dark_book = _router.get_dark_book(broker) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -1015,22 +1025,16 @@ async def _emsd_main( # spawn one task per broker feed async with ( maybe_open_feed( - broker, - [symbol], + [fqsn], loglevel=loglevel, - ) as (feed, stream), + ) as (feed, quote_stream), ): # XXX: this should be initial price quote from target provider - first_quote = feed.first_quotes[symbol] + first_quote = feed.first_quotes[fqsn] book = _router.get_dark_book(broker) - last = book.lasts[(broker, symbol)] = first_quote['last'] - - # XXX: ib is a cucker but we've fixed avoiding receiving any - # `Nan`s in the backend during market hours (right?). this was - # here previously as a sanity check during market hours. - # assert not isnan(last) + book.lasts[fqsn] = first_quote['last'] # open a stream with the brokerd backend for order # flow dialogue @@ -1054,8 +1058,8 @@ async def _emsd_main( # flatten out collected pps from brokerd for delivery pp_msgs = { - sym: list(pps.values()) - for sym, pps in relay.positions.items() + fqsn: list(pps.values()) + for fqsn, pps in relay.positions.items() } # signal to client that we're started and deliver @@ -1072,9 +1076,9 @@ async def _emsd_main( brokerd_stream, ems_client_order_stream, - stream, + quote_stream, broker, - symbol, + fqsn, # form: ... book ) @@ -1090,7 +1094,7 @@ async def _emsd_main( # relay.brokerd_dialogue, brokerd_stream, - symbol, + fqsn, feed, dark_book, _router, diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index f87e2203..99039049 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -32,6 +32,7 @@ from dataclasses import dataclass from .. import data from ..data._normalize import iterticks +from ..data._source import unpack_fqsn from ..log import get_logger from ._messages import ( BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, @@ -446,7 +447,7 @@ async def trades_dialogue( ctx: tractor.Context, broker: str, - symbol: str, + fqsn: str, loglevel: str = None, ) -> None: @@ -455,8 +456,7 @@ async def trades_dialogue( async with ( data.open_feed( - broker, - [symbol], + [fqsn], loglevel=loglevel, ) as feed, @@ -491,15 +491,16 @@ async def trades_dialogue( @asynccontextmanager async def open_paperboi( - broker: str, - symbol: str, + fqsn: str, loglevel: str, ) -> Callable: - '''Spawn a paper engine actor and yield through access to + ''' + Spawn a paper engine actor and yield through access to its context. ''' + broker, symbol, expiry = unpack_fqsn(fqsn) service_name = f'paperboi.{broker}' async with ( @@ -518,7 +519,7 @@ async def open_paperboi( async with portal.open_context( trades_dialogue, broker=broker, - symbol=symbol, + fqsn=fqsn, loglevel=loglevel, ) as (ctx, first): diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index ecad241d..d31bf7b1 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -19,6 +19,8 @@ Sampling and broadcast machinery for (soft) real-time delivery of financial data flows. """ +from __future__ import annotations +from collections import Counter import time import tractor @@ -133,18 +135,20 @@ async def increment_ohlc_buffer( # a given sample period. subs = sampler.subscribers.get(delay_s, ()) - for ctx in subs: + for stream in subs: try: - await ctx.send_yield({'index': shm._last.value}) + await stream.send({'index': shm._last.value}) except ( trio.BrokenResourceError, trio.ClosedResourceError ): - log.error(f'{ctx.chan.uid} dropped connection') - subs.remove(ctx) + log.error( + f'{stream._ctx.chan.uid} dropped connection' + ) + subs.remove(stream) -@tractor.stream +@tractor.context async def iter_ohlc_periods( ctx: tractor.Context, delay_s: int, @@ -158,18 +162,20 @@ async def iter_ohlc_periods( ''' # add our subscription subs = sampler.subscribers.setdefault(delay_s, []) - subs.append(ctx) + await ctx.started() + async with ctx.open_stream() as stream: + subs.append(stream) - try: - # stream and block until cancelled - await trio.sleep_forever() - finally: try: - subs.remove(ctx) - except ValueError: - log.error( - f'iOHLC step stream was already dropped for {ctx.chan.uid}?' - ) + # stream and block until cancelled + await trio.sleep_forever() + finally: + try: + subs.remove(stream) + except ValueError: + log.error( + f'iOHLC step stream was already dropped {ctx.chan.uid}?' + ) async def sample_and_broadcast( @@ -177,17 +183,19 @@ async def sample_and_broadcast( bus: '_FeedsBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, + brokername: str, sum_tick_vlm: bool = True, ) -> None: log.info("Started shared mem bar writer") + overruns = Counter() + # iterate stream delivered by broker async for quotes in quote_stream: # TODO: ``numba`` this! - for sym, quote in quotes.items(): - + for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that # will require at least some way to prevent task switching @@ -251,9 +259,15 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus._subscribers[sym.lower()] + subs = bus._subscribers[broker_symbol.lower()] + + # NOTE: by default the broker backend doesn't append + # it's own "name" into the fqsn schema (but maybe it + # should?) so we have to manually generate the correct + # key here. + bsym = f'{broker_symbol}.{brokername}' + lags: int = 0 - lags = 0 for (stream, tick_throttle) in subs: try: @@ -262,7 +276,9 @@ async def sample_and_broadcast( # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. try: - stream.send_nowait((sym, quote)) + stream.send_nowait( + (bsym, quote) + ) except trio.WouldBlock: ctx = getattr(stream, '_ctx', None) if ctx: @@ -271,12 +287,22 @@ async def sample_and_broadcast( f'{ctx.channel.uid} !!!' ) else: + key = id(stream) + overruns[key] += 1 log.warning( f'Feed overrun {bus.brokername} -> ' f'feed @ {tick_throttle} Hz' ) + if overruns[key] > 6: + log.warning( + f'Dropping consumer {stream}' + ) + await stream.aclose() + raise trio.BrokenResourceError else: - await stream.send({sym: quote}) + await stream.send( + {bsym: quote} + ) if cs.cancelled_caught: lags += 1 @@ -295,7 +321,7 @@ async def sample_and_broadcast( '`brokerd`-quotes-feed connection' ) if tick_throttle: - assert stream.closed() + assert stream._closed # XXX: do we need to deregister here # if it's done in the fee bus code? @@ -399,7 +425,16 @@ async def uniform_rate_send( # rate timing exactly lul try: await stream.send({sym: first_quote}) - except trio.ClosedResourceError: + except ( + # NOTE: any of these can be raised by ``tractor``'s IPC + # transport-layer and we want to be highly resilient + # to consumers which crash or lose network connection. + # I.e. we **DO NOT** want to crash and propagate up to + # ``pikerd`` these kinds of errors! + trio.ClosedResourceError, + trio.BrokenResourceError, + ConnectionResetError, + ): # if the feed consumer goes down then drop # out of this rate limiter log.warning(f'{stream} closed') diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 5f7fdcd0..103fa6d5 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -258,7 +258,7 @@ class ShmArray: if index < 0: raise ValueError( f'Array size of {self._len} was overrun during prepend.\n' - 'You have passed {abs(index)} too many datums.' + f'You have passed {abs(index)} too many datums.' ) end = index + length diff --git a/piker/data/_source.py b/piker/data/_source.py index dfa48453..3fa6db7b 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -17,6 +17,7 @@ """ numpy data source coversion helpers. """ +from __future__ import annotations from typing import Any import decimal @@ -91,6 +92,40 @@ def ohlc_zeros(length: int) -> np.ndarray: return np.zeros(length, dtype=base_ohlc_dtype) +def unpack_fqsn(fqsn: str) -> tuple[str, str, str]: + ''' + Unpack a fully-qualified-symbol-name to ``tuple``. + + ''' + venue = '' + suffix = '' + + # TODO: probably reverse the order of all this XD + tokens = fqsn.split('.') + if len(tokens) < 3: + # probably crypto + symbol, broker = tokens + return ( + broker, + symbol, + '', + ) + + elif len(tokens) > 3: + symbol, venue, suffix, broker = tokens + else: + symbol, venue, broker = tokens + suffix = '' + + # head, _, broker = fqsn.rpartition('.') + # symbol, _, suffix = head.rpartition('.') + return ( + broker, + '.'.join([symbol, venue]), + suffix, + ) + + class Symbol(BaseModel): """I guess this is some kinda container thing for dealing with all the different meta-data formats from brokers? @@ -98,24 +133,72 @@ class Symbol(BaseModel): Yah, i guess dats what it izz. """ key: str - type_key: str # {'stock', 'forex', 'future', ... etc.} - tick_size: float - lot_tick_size: float # "volume" precision as min step value - tick_size_digits: int - lot_size_digits: int + tick_size: float = 0.01 + lot_tick_size: float = 0.0 # "volume" precision as min step value + tick_size_digits: int = 2 + lot_size_digits: int = 0 + suffix: str = '' broker_info: dict[str, dict[str, Any]] = {} # specifies a "class" of financial instrument # ex. stock, futer, option, bond etc. + # @validate_arguments + @classmethod + def from_broker_info( + cls, + broker: str, + symbol: str, + info: dict[str, Any], + suffix: str = '', + + # XXX: like wtf.. + # ) -> 'Symbol': + ) -> None: + + tick_size = info.get('price_tick_size', 0.01) + lot_tick_size = info.get('lot_tick_size', 0.0) + + return Symbol( + key=symbol, + tick_size=tick_size, + lot_tick_size=lot_tick_size, + tick_size_digits=float_digits(tick_size), + lot_size_digits=float_digits(lot_tick_size), + suffix=suffix, + broker_info={broker: info}, + ) + + @classmethod + def from_fqsn( + cls, + fqsn: str, + info: dict[str, Any], + + # XXX: like wtf.. + # ) -> 'Symbol': + ) -> None: + broker, key, suffix = unpack_fqsn(fqsn) + return cls.from_broker_info( + broker, + key, + info=info, + suffix=suffix, + ) + + @property + def type_key(self) -> str: + return list(self.broker_info.values())[0]['asset_type'] + @property def brokers(self) -> list[str]: return list(self.broker_info.keys()) def nearest_tick(self, value: float) -> float: - """Return the nearest tick value based on mininum increment. + ''' + Return the nearest tick value based on mininum increment. - """ + ''' mult = 1 / self.tick_size return round(value * mult) / mult @@ -131,37 +214,44 @@ class Symbol(BaseModel): self.key, ) + def tokens(self) -> tuple[str]: + broker, key = self.front_feed() + if self.suffix: + return (key, self.suffix, broker) + else: + return (key, broker) + + def front_fqsn(self) -> str: + ''' + fqsn = "fully qualified symbol name" + + Basically the idea here is for all client-ish code (aka programs/actors + that ask the provider agnostic layers in the stack for data) should be + able to tell which backend / venue / derivative each data feed/flow is + from by an explicit string key of the current form: + + ... + + TODO: I have thoughts that we should actually change this to be + more like an "attr lookup" (like how the web should have done + urls, but marketting peeps ruined it etc. etc.): + + ... + + ''' + tokens = self.tokens() + fqsn = '.'.join(tokens) + return fqsn + def iterfqsns(self) -> list[str]: - return [ - mk_fqsn(self.key, broker) - for broker in self.broker_info.keys() - ] + keys = [] + for broker in self.broker_info.keys(): + fqsn = mk_fqsn(self.key, broker) + if self.suffix: + fqsn += f'.{self.suffix}' + keys.append(fqsn) - -@validate_arguments -def mk_symbol( - - key: str, - type_key: str, - tick_size: float = 0.01, - lot_tick_size: float = 0, - broker_info: dict[str, Any] = {}, - -) -> Symbol: - ''' - Create and return an instrument description for the - "symbol" named as ``key``. - - ''' - return Symbol( - key=key, - type_key=type_key, - tick_size=tick_size, - lot_tick_size=lot_tick_size, - tick_size_digits=float_digits(tick_size), - lot_size_digits=float_digits(lot_tick_size), - broker_info=broker_info, - ) + return keys def from_df( diff --git a/piker/data/feed.py b/piker/data/feed.py index e2e91d7b..260cab9b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -50,9 +50,8 @@ from ._sharedmem import ( from .ingest import get_ingestormod from ._source import ( base_iohlc_dtype, - mk_symbol, Symbol, - mk_fqsn, + unpack_fqsn, ) from ..ui import _search from ._sampling import ( @@ -191,10 +190,8 @@ async def _setup_persistent_brokerd( async def manage_history( mod: ModuleType, - shm: ShmArray, bus: _FeedsBus, - symbol: str, - we_opened_shm: bool, + fqsn: str, some_data_ready: trio.Event, feed_is_live: trio.Event, @@ -208,21 +205,28 @@ async def manage_history( buffer. ''' - task_status.started() + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + shm, opened = maybe_open_shm_array( + key=fqsn, - opened = we_opened_shm - # TODO: history validation - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + + # we expect the sub-actor to write + readonly=False, + ) if opened: - # ask broker backend for new history + log.info('No existing `marketstored` found..') # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars - cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) + + # yield back after client connect with filled shm + task_status.started(shm) # indicate to caller that feed can be delivered to # remote requesting client since we've loaded history @@ -243,13 +247,12 @@ async def manage_history( # start shm incrementing for OHLC sampling at the current # detected sampling period if one dne. if sampler.incrementers.get(delay_s) is None: - cs = await bus.start_task( + await bus.start_task( increment_ohlc_buffer, delay_s, ) await trio.sleep_forever() - cs.cancel() async def allocate_persistent_feed( @@ -279,20 +282,6 @@ async def allocate_persistent_feed( except ImportError: mod = get_ingestormod(brokername) - fqsn = mk_fqsn(brokername, symbol) - - # (maybe) allocate shm array for this broker/symbol which will - # be used for fast near-term history capture and processing. - shm, opened = maybe_open_shm_array( - key=fqsn, - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), - - # we expect the sub-actor to write - readonly=False, - ) - # mem chan handed to broker backend so it can push real-time # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) @@ -301,30 +290,9 @@ async def allocate_persistent_feed( some_data_ready = trio.Event() feed_is_live = trio.Event() - # run 2 tasks: - # - a history loader / maintainer - # - a real-time streamer which consumers and sends new data to any - # consumers as well as writes to storage backends (as configured). - - # XXX: neither of these will raise but will cause an inf hang due to: - # https://github.com/python-trio/trio/issues/2258 - # bus.nursery.start_soon( - # await bus.start_task( - - await bus.nursery.start( - manage_history, - mod, - shm, - bus, - symbol, - opened, - some_data_ready, - feed_is_live, - ) - # establish broker backend quote stream by calling # ``stream_quotes()``, which is a required broker backend endpoint. - init_msg, first_quotes = await bus.nursery.start( + init_msg, first_quote = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, @@ -333,11 +301,39 @@ async def allocate_persistent_feed( loglevel=loglevel, ) ) + # the broker-specific fully qualified symbol name, + # but ensure it is lower-cased for external use. + bfqsn = init_msg[symbol]['fqsn'].lower() + init_msg[symbol]['fqsn'] = bfqsn + + # HISTORY, run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + shm = await bus.nursery.start( + manage_history, + mod, + bus, + bfqsn, + some_data_ready, + feed_is_live, + ) # we hand an IPC-msg compatible shm token to the caller so it # can read directly from the memory which will be written by # this task. - init_msg[symbol]['shm_token'] = shm.token + msg = init_msg[symbol] + msg['shm_token'] = shm.token + + # true fqsn + fqsn = '.'.join((bfqsn, brokername)) + # add a fqsn entry that includes the ``.`` suffix + init_msg[fqsn] = msg # TODO: pretty sure we don't need this? why not just leave 1s as # the fastest "sample period" since we'll probably always want that @@ -350,8 +346,22 @@ async def allocate_persistent_feed( log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() - bus.feeds[symbol.lower()] = (init_msg, first_quotes) - task_status.started((init_msg, first_quotes)) + # append ``.`` suffix to each quote symbol + bsym = symbol + f'.{brokername}' + generic_first_quotes = { + bsym: first_quote, + fqsn: first_quote, + } + + bus.feeds[symbol] = bus.feeds[fqsn] = ( + init_msg, + generic_first_quotes, + ) + # for ambiguous names we simply apply the retreived + # feed to that name (for now). + + # task_status.started((init_msg, generic_first_quotes)) + task_status.started() # backend will indicate when real-time quotes have begun. await feed_is_live.wait() @@ -366,10 +376,11 @@ async def allocate_persistent_feed( bus, shm, quote_stream, + brokername, sum_tick_vlm ) finally: - log.warning(f'{symbol}@{brokername} feed task terminated') + log.warning(f'{fqsn} feed task terminated') @tractor.context @@ -402,25 +413,16 @@ async def open_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - bus._subscribers.setdefault(symbol, []) - fqsn = mk_fqsn(brokername, symbol) - - entry = bus.feeds.get(symbol) # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery + entry = bus.feeds.get(symbol) if entry is None: - if not start_stream: - raise RuntimeError( - f'No stream feed exists for {fqsn}?\n' - f'You may need a `brokerd` started first.' - ) - - # allocate a new actor-local stream bus which will persist for - # this `brokerd`. + # allocate a new actor-local stream bus which + # will persist for this `brokerd`. async with bus.task_lock: - init_msg, first_quotes = await bus.nursery.start( + await bus.nursery.start( partial( allocate_persistent_feed, @@ -442,9 +444,30 @@ async def open_feed_bus( # subscriber init_msg, first_quotes = bus.feeds[symbol] + msg = init_msg[symbol] + bfqsn = msg['fqsn'].lower() + + # true fqsn + fqsn = '.'.join([bfqsn, brokername]) + assert fqsn in first_quotes + assert bus.feeds[fqsn] + + # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) + bsym = symbol + f'.{brokername}' + assert bsym in first_quotes + + # we use the broker-specific fqsn (bfqsn) for + # the sampler subscription since the backend isn't (yet) + # expected to append it's own name to the fqsn, so we filter + # on keys which *do not* include that name (e.g .ib) . + bus._subscribers.setdefault(bfqsn, []) + # send this even to subscribers to existing feed? # deliver initial info message a first quote asap - await ctx.started((init_msg, first_quotes)) + await ctx.started(( + init_msg, + first_quotes, + )) if not start_stream: log.warning(f'Not opening real-time stream for {fqsn}') @@ -454,12 +477,15 @@ async def open_feed_bus( async with ( ctx.open_stream() as stream, ): + # re-send to trigger display loop cycle (necessary especially + # when the mkt is closed and no real-time messages are + # expected). + await stream.send({fqsn: first_quotes}) + + # open a bg task which receives quotes over a mem chan + # and only pushes them to the target actor-consumer at + # a max ``tick_throttle`` instantaneous rate. if tick_throttle: - - # open a bg task which receives quotes over a mem chan - # and only pushes them to the target actor-consumer at - # a max ``tick_throttle`` instantaneous rate. - send, recv = trio.open_memory_channel(2**10) cs = await bus.start_task( uniform_rate_send, @@ -472,12 +498,15 @@ async def open_feed_bus( else: sub = (stream, tick_throttle) - subs = bus._subscribers[symbol] + subs = bus._subscribers[bfqsn] subs.append(sub) try: uid = ctx.chan.uid + # ctrl protocol for start/stop of quote streams based on UI + # state (eg. don't need a stream when a symbol isn't being + # displayed). async for msg in stream: if msg == 'pause': @@ -502,7 +531,7 @@ async def open_feed_bus( # n.cancel_scope.cancel() cs.cancel() try: - bus._subscribers[symbol].remove(sub) + bus._subscribers[bfqsn].remove(sub) except ValueError: log.warning(f'{sub} for {symbol} was already removed?') @@ -519,19 +548,20 @@ async def open_sample_step_stream( # created for all practical purposes async with maybe_open_context( acm_func=partial( - portal.open_stream_from, + portal.open_context, iter_ohlc_periods, ), kwargs={'delay_s': delay_s}, - ) as (cache_hit, istream): - if cache_hit: - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - async with istream.subscribe() as bistream: - yield bistream - else: - yield istream + ) as (cache_hit, (ctx, first)): + async with ctx.open_stream() as istream: + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with istream.subscribe() as bistream: + yield bistream + else: + yield istream @dataclass @@ -627,10 +657,10 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( - brokername: str, - symbols: list[str], - loglevel: Optional[str] = None, + fqsns: list[str], + + loglevel: Optional[str] = None, backpressure: bool = True, start_stream: bool = True, tick_throttle: Optional[float] = None, # Hz @@ -640,7 +670,10 @@ async def open_feed( Open a "data feed" which provides streamed real-time quotes. ''' - sym = symbols[0].lower() + fqsn = fqsns[0].lower() + + brokername, key, suffix = unpack_fqsn(fqsn) + bfqsn = fqsn.replace('.' + brokername, '') try: mod = get_brokermod(brokername) @@ -661,7 +694,7 @@ async def open_feed( portal.open_context( open_feed_bus, brokername=brokername, - symbol=sym, + symbol=bfqsn, loglevel=loglevel, start_stream=start_stream, tick_throttle=tick_throttle, @@ -678,9 +711,10 @@ async def open_feed( ): # we can only read from shm shm = attach_shm_array( - token=init_msg[sym]['shm_token'], + token=init_msg[bfqsn]['shm_token'], readonly=True, ) + assert fqsn in first_quotes feed = Feed( name=brokername, @@ -693,17 +727,15 @@ async def open_feed( ) for sym, data in init_msg.items(): - si = data['symbol_info'] - - symbol = mk_symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), + fqsn = data['fqsn'] + f'.{brokername}' + symbol = Symbol.from_fqsn( + fqsn, + info=si, ) - symbol.broker_info[brokername] = si + # symbol.broker_info[brokername] = si + feed.symbols[fqsn] = symbol feed.symbols[sym] = symbol # cast shm dtype to list... can't member why we need this @@ -727,26 +759,27 @@ async def open_feed( @asynccontextmanager async def maybe_open_feed( - brokername: str, - symbols: list[str], + fqsns: list[str], loglevel: Optional[str] = None, **kwargs, -) -> (Feed, ReceiveChannel[dict[str, Any]]): +) -> ( + Feed, + ReceiveChannel[dict[str, Any]], +): ''' Maybe open a data to a ``brokerd`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped in a tractor broadcast receiver. ''' - sym = symbols[0].lower() + fqsn = fqsns[0] async with maybe_open_context( acm_func=open_feed, kwargs={ - 'brokername': brokername, - 'symbols': [sym], + 'fqsns': fqsns, 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), @@ -754,11 +787,12 @@ async def maybe_open_feed( 'backpressure': kwargs.get('backpressure', True), 'start_stream': kwargs.get('start_stream', True), }, - key=sym, + key=fqsn, + ) as (cache_hit, feed): if cache_hit: - log.info(f'Using cached feed for {brokername}.{sym}') + log.info(f'Using cached feed for {fqsn}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f1dd49d7..7e75c283 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,6 +37,7 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray +from ..data._source import Symbol from ._api import ( Fsp, _load_builtins, @@ -76,7 +77,7 @@ async def filter_quotes_by_sym( async def fsp_compute( ctx: tractor.Context, - symbol: str, + symbol: Symbol, feed: Feed, quote_stream: trio.abc.ReceiveChannel, @@ -95,13 +96,14 @@ async def fsp_compute( disabled=True ) + fqsn = symbol.front_fqsn() out_stream = func( # TODO: do we even need this if we do the feed api right? # shouldn't a local stream do this before we get a handle # to the async iterable? it's that or we do some kinda # async itertools style? - filter_quotes_by_sym(symbol, quote_stream), + filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg feed.shm, @@ -235,8 +237,7 @@ async def cascade( ctx: tractor.Context, # data feed key - brokername: str, - symbol: str, + fqsn: str, src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], @@ -289,8 +290,7 @@ async def cascade( # open a data feed stream with requested broker async with data.feed.maybe_open_feed( - brokername, - [symbol], + [fqsn], # TODO throttle tick outputs from *this* daemon since # it'll emit tons of ticks due to the throttle only @@ -299,6 +299,7 @@ async def cascade( # tick_throttle=60, ) as (feed, quote_stream): + symbol = feed.symbols[fqsn] profiler(f'{func}: feed up') diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 2a3689a3..7938e0d8 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -239,7 +239,7 @@ class GodWidget(QWidget): symbol = linkedsplits.symbol if symbol is not None: self.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' + f'{symbol.front_fqsn()} ' f'tick:{symbol.tick_size}' ) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index c6d0f5aa..9957baa4 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -211,7 +211,6 @@ async def graphics_update_loop( # async for quotes in iter_drain_quotes(): async for quotes in stream: - quote_period = time.time() - last_quote quote_rate = round( 1/quote_period, 1) if quote_period > 0 else float('inf') @@ -480,24 +479,23 @@ async def display_symbol_data( # clear_on_next=True, # group_key=loading_sym_key, # ) - async with open_feed( - provider, - [sym], - loglevel=loglevel, + ['.'.join((sym, provider))], + loglevel=loglevel, - # limit to at least display's FPS - # avoiding needless Qt-in-guest-mode context switches - tick_throttle=_quote_throttle_rate, + # limit to at least display's FPS + # avoiding needless Qt-in-guest-mode context switches + tick_throttle=_quote_throttle_rate, ) as feed: ohlcv: ShmArray = feed.shm bars = ohlcv.array symbol = feed.symbols[sym] + fqsn = symbol.front_fqsn() # load in symbol's ohlc data godwidget.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' + f'{fqsn} ' f'tick:{symbol.tick_size} ' f'step:1s ' ) @@ -582,8 +580,7 @@ async def display_symbol_data( open_order_mode( feed, chart, - symbol, - provider, + fqsn, order_mode_started ) ): diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index ac35067c..d56cc2d5 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -386,6 +386,7 @@ class FspAdmin: portal: tractor.Portal, complete: trio.Event, started: trio.Event, + fqsn: str, dst_shm: ShmArray, conf: dict, target: Fsp, @@ -397,7 +398,6 @@ class FspAdmin: cluster and sleeps until signalled to exit. ''' - brokername, sym = self.linked.symbol.front_feed() ns_path = str(target.ns_path) async with ( portal.open_context( @@ -406,8 +406,7 @@ class FspAdmin: cascade, # data feed key - brokername=brokername, - symbol=sym, + fqsn=fqsn, # mems src_shm_token=self.src_shm.token, @@ -429,7 +428,7 @@ class FspAdmin: ): # register output data self._registry[ - (brokername, sym, ns_path) + (fqsn, ns_path) ] = ( stream, dst_shm, @@ -452,11 +451,11 @@ class FspAdmin: ) -> (ShmArray, trio.Event): - fqsn = self.linked.symbol.front_feed() + fqsn = self.linked.symbol.front_fqsn() # allocate an output shm array key, dst_shm, opened = maybe_mk_fsp_shm( - '.'.join(fqsn), + fqsn, target=target, readonly=True, ) @@ -477,6 +476,7 @@ class FspAdmin: portal, complete, started, + fqsn, dst_shm, conf, target, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 9f4dbadb..6316f116 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -268,13 +268,14 @@ class OrderMode: ''' staged = self._staged_order - symbol = staged.symbol + symbol: Symbol = staged.symbol oid = str(uuid.uuid4()) # format order data for ems + fqsn = symbol.front_fqsn() order = staged.copy( update={ - 'symbol': symbol.key, + 'symbol': fqsn, 'oid': oid, } ) @@ -519,8 +520,7 @@ async def open_order_mode( feed: Feed, chart: 'ChartPlotWidget', # noqa - symbol: Symbol, - brokername: str, + fqsn: str, started: trio.Event, ) -> None: @@ -546,8 +546,7 @@ async def open_order_mode( # spawn EMS actor-service async with ( - - open_ems(brokername, symbol) as ( + open_ems(fqsn) as ( book, trades_stream, position_msgs, @@ -556,8 +555,7 @@ async def open_order_mode( trio.open_nursery() as tn, ): - log.info(f'Opening order mode for {brokername}.{symbol.key}') - + log.info(f'Opening order mode for {fqsn}') view = chart.view # annotations editors @@ -566,7 +564,7 @@ async def open_order_mode( # symbol id symbol = chart.linked.symbol - symkey = symbol.key + symkey = symbol.front_fqsn() # map of per-provider account keys to position tracker instances trackers: dict[str, PositionTracker] = {} @@ -610,7 +608,7 @@ async def open_order_mode( log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') startup_pp.update_from_msg(msg) - # allocator + # allocator config alloc = mk_allocator( symbol=symbol, account=account_name, @@ -818,8 +816,18 @@ async def process_trades_and_update_ui( 'position', ): sym = mode.chart.linked.symbol - if msg['symbol'].lower() in sym.key: + pp_msg_symbol = msg['symbol'].lower() + fqsn = sym.front_fqsn() + broker, key = sym.front_feed() + # print( + # f'pp msg symbol: {pp_msg_symbol}\n', + # f'fqsn: {fqsn}\n', + # f'front key: {key}\n', + # ) + if ( + pp_msg_symbol == fqsn.replace(f'.{broker}', '') + ): tracker = mode.trackers[msg['account']] tracker.live_pp.update_from_msg(msg) # update order pane widgets