From bd8e4760d599210f3b48c62c69f0de74a59fd2d3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 May 2023 12:16:17 -0400 Subject: [PATCH] Port everything strictly to `Position.mkt` and `Flume.mkt` --- piker/accounting/_allocate.py | 2 +- piker/accounting/_mktinfo.py | 5 +++ piker/accounting/_pos.py | 61 +++++++++++---------------------- piker/brokers/ib/broker.py | 3 +- piker/clearing/_ems.py | 2 +- piker/clearing/_paper_engine.py | 2 +- piker/data/flows.py | 33 +++--------------- piker/data/history.py | 4 +-- piker/fsp/_api.py | 15 +++----- piker/fsp/_engine.py | 10 +++--- piker/service/marketstore.py | 2 +- piker/ui/_display.py | 18 +++++----- piker/ui/_fsp.py | 30 +++++++++++----- piker/ui/_position.py | 4 +-- piker/ui/order_mode.py | 2 +- 15 files changed, 80 insertions(+), 113 deletions(-) diff --git a/piker/accounting/_allocate.py b/piker/accounting/_allocate.py index 4bafc2f6..18900c9f 100644 --- a/piker/accounting/_allocate.py +++ b/piker/accounting/_allocate.py @@ -203,7 +203,7 @@ class Allocator(Struct): # compute a fractional slots size to display slots_used = self.slots_used( Position( - symbol=sym, + mkt=sym, size=order_size, ppu=price, bs_mktid=sym, diff --git a/piker/accounting/_mktinfo.py b/piker/accounting/_mktinfo.py index 341ff2db..653c8d04 100644 --- a/piker/accounting/_mktinfo.py +++ b/piker/accounting/_mktinfo.py @@ -295,6 +295,11 @@ class MktPair(Struct, frozen=True): dst=dst, src=src, **msg, + # XXX NOTE: ``msgspec`` can encode `Decimal` + # but it doesn't decide to it by default since + # we aren't spec-cing these msgs as structs, SO + # we have to ensure we do a struct type case (which `.copy()` + # does) to ensure we get the right type! ).copy() @property diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 453ee324..dda39177 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -79,11 +79,7 @@ class Position(Struct): file system (in TOML) and to interchange as a msg over IPC. ''' - symbol: Symbol | MktPair - - @property - def mkt(self) -> MktPair: - return self.symbol + mkt: MktPair # can be +ve or -ve for long/short size: float @@ -143,37 +139,20 @@ class Position(Struct): # listing venue here even when the backend isn't providing # it via the trades ledger.. # drop symbol obj in serialized form - s = d.pop('symbol') - fqme = s.fqme + mkt: MktPair = d.pop('mkt') + assert isinstance(mkt, MktPair) + + fqme = mkt.fqme broker, mktep, venue, suffix = unpack_fqme(fqme) - if isinstance(s, Symbol): - sym_info = s.broker_info[broker] - d['asset_type'] = sym_info['asset_type'] - d['price_tick'] = ( - sym_info.get('price_tick_size') - or - s.tick_size - ) - d['size_tick'] = ( - sym_info.get('lot_tick_size') - or - s.lot_tick_size - ) + # an asset resolved mkt where we have ``Asset`` info about + # each tradeable asset in the market. + if mkt.resolved: + dst: Asset = mkt.dst + d['asset_type'] = dst.atype - # the newwww wayyy B) - else: - mkt = s - assert isinstance(mkt, MktPair) - - # an asset resolved mkt where we have ``Asset`` info about - # each tradeable asset in the market. - if mkt.resolved: - dst: Asset = mkt.dst - d['asset_type'] = dst.atype - - d['price_tick'] = mkt.price_tick - d['size_tick'] = mkt.size_tick + d['price_tick'] = mkt.price_tick + d['size_tick'] = mkt.size_tick if self.expiry is None: d.pop('expiry', None) @@ -267,12 +246,12 @@ class Position(Struct): ) -> None: # XXX: better place to do this? - symbol = self.symbol + mkt = self.mkt # TODO: switch to new fields..? # .size_tick_digits, .price_tick_digits - size_tick_digits = symbol.lot_size_digits - price_tick_digits = symbol.tick_size_digits + size_tick_digits = mkt.lot_size_digits + price_tick_digits = mkt.tick_size_digits self.ppu = round( # TODO: change this to ppu? @@ -470,7 +449,7 @@ class Position(Struct): size = round(size * self.split_ratio) return float( - self.symbol.quantize(size), + self.mkt.quantize(size), ) def minimize_clears( @@ -571,7 +550,7 @@ class PpTable(Struct): if not pp: # if no existing pp, allocate fresh one. pp = pps[bs_mktid] = Position( - mkt, + mkt=mkt, size=0.0, ppu=0.0, bs_mktid=bs_mktid, @@ -583,8 +562,8 @@ class PpTable(Struct): # a shorter string), instead use the one from the # transaction since it likely has (more) full # information from the provider. - if len(pp.symbol.fqme) < len(fqme): - pp.symbol = mkt + if len(pp.mkt.fqme) < len(fqme): + pp.mkt = mkt clears = pp.clears if clears: @@ -735,7 +714,7 @@ class PpTable(Struct): if closed: bs_mktid: str for bs_mktid, pos in closed.items(): - fqme: str = pos.symbol.fqme + fqme: str = pos.mkt.fqme if fqme in self.conf: self.conf.pop(fqme) else: diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 1667d5f8..bcd947ea 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -345,6 +345,7 @@ async def update_and_audit_msgs( ) -> list[BrokerdPosition]: msgs: list[BrokerdPosition] = [] + p: Position for p in pps: bs_mktid = p.bs_mktid @@ -427,7 +428,7 @@ async def update_and_audit_msgs( # right since `.broker` is already included? account=f'ib.{acctid}', # XXX: the `.ib` is stripped..? - symbol=p.symbol.fqme, + symbol=p.mkt.fqme, # currency=ibppmsg.currency, size=p.size, avg_price=p.ppu, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 77cad1bd..ee7ec284 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -1281,7 +1281,7 @@ async def process_client_order_cmds( # TODO: make this configurable from our top level # config, prolly in a .clearing` section? spread_slap: float = 5 - min_tick = float(flume.symbol.size_tick) + min_tick = float(flume.mkt.size_tick) min_tick_digits = float_digits(min_tick) if action == 'buy': diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index d6e29245..bf4f0948 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -611,7 +611,7 @@ async def trades_dialogue( pp_msgs.append(BrokerdPosition( broker=broker, account='paper', - symbol=pos.symbol.fqme, + symbol=pos.mkt.fqme, size=pos.size, avg_price=pos.ppu, )) diff --git a/piker/data/flows.py b/piker/data/flows.py index 07ca304e..7776a602 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -22,7 +22,6 @@ real-time data processing data-structures. """ from __future__ import annotations -# from decimal import Decimal from typing import ( TYPE_CHECKING, ) @@ -31,13 +30,8 @@ import tractor import pendulum import numpy as np -from ..accounting._mktinfo import ( - MktPair, - Symbol, -) -from ._util import ( - log, -) +from ..accounting import MktPair +from ._util import log from .types import Struct from ._sharedmem import ( attach_shm_array, @@ -94,18 +88,10 @@ class Flume(Struct): queuing properties. ''' - mkt: MktPair | Symbol + mkt: MktPair first_quote: dict _rt_shm_token: _Token - @property - def symbol(self) -> MktPair | Symbol: - log.warning( - '`Flume.symbol` is deprecated!\n' - 'Use `.mkt: MktPair` instead!' - ) - return self.mkt - # optional since some data flows won't have a "downsampled" history # buffer/stream (eg. FSPs). _hist_shm_token: _Token | None = None @@ -208,18 +194,7 @@ class Flume(Struct): ''' mkt_msg = msg.pop('mkt') - - if 'dst' in mkt_msg: - mkt = MktPair.from_msg(mkt_msg) - - else: - # XXX NOTE: ``msgspec`` can encode `Decimal` - # but it doesn't decide to it by default since - # we aren't spec-cing these msgs as structs, SO - # we have to ensure we do a struct type case (which `.copy()` - # does) to ensure we get the right type! - mkt = Symbol(**mkt_msg).copy() - + mkt = MktPair.from_msg(mkt_msg) return cls(mkt=mkt, **msg) def get_index( diff --git a/piker/data/history.py b/piker/data/history.py index f8260c86..28a4590e 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -648,7 +648,7 @@ async def manage_history( # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( - key=f'piker.{service}[{uuid[:16]}.{fqme}.hist', + key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -665,7 +665,7 @@ async def manage_history( ) rt_shm, opened = maybe_open_shm_array( - key=f'piker.{service}[{uuid[:16]}.{fqme}.rt', + key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index 8226d16b..11d1e7dc 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -174,16 +174,6 @@ def fsp( return Fsp(wrapped, outputs=(wrapped.__name__,)) -def mk_fsp_shm_key( - sym: str, - target: Fsp - -) -> str: - actor_name, uuid = tractor.current_actor().uid - uuid_snip: str = uuid[:16] - return f'piker.{actor_name}[{uuid_snip}].{sym}.{target.name}' - - def maybe_mk_fsp_shm( sym: str, target: Fsp, @@ -207,7 +197,10 @@ def maybe_mk_fsp_shm( [(field_name, float) for field_name in target.outputs] ) - key = mk_fsp_shm_key(sym, target) + # (attempt to) uniquely key the fsp shm buffers + actor_name, uuid = tractor.current_actor().uid + uuid_snip: str = uuid[:16] + key: str = f'piker.{actor_name}[{uuid_snip}].{sym}.{target.name}' shm, opened = maybe_open_shm_array( key, diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 13dcfccb..9a6ebddb 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -45,7 +45,7 @@ from ..data._sampling import ( _default_delay_s, open_sample_stream, ) -from ..accounting._mktinfo import Symbol +from ..accounting import MktPair from ._api import ( Fsp, _load_builtins, @@ -85,7 +85,7 @@ async def filter_quotes_by_sym( async def fsp_compute( - symbol: Symbol, + mkt: MktPair, flume: Flume, quote_stream: trio.abc.ReceiveChannel, @@ -104,7 +104,7 @@ async def fsp_compute( disabled=True ) - fqme = symbol.fqme + fqme = mkt.fqme out_stream = func( # TODO: do we even need this if we do the feed api right? @@ -340,7 +340,7 @@ async def cascade( ) as feed: flume = feed.flumes[fqme] - symbol = flume.symbol + mkt = flume.mkt assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') @@ -352,7 +352,7 @@ async def cascade( fsp_target = partial( fsp_compute, - symbol=symbol, + mkt=mkt, flume=flume, quote_stream=flume.stream, diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index 930c44da..68b9e953 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -762,7 +762,7 @@ async def open_tsdb_client( if fqme: flume = feed.flumes[fqme] - symbol = flume.symbol + symbol = flume.mkt if symbol: fqme = symbol.fqme diff --git a/piker/ui/_display.py b/piker/ui/_display.py index eb8e330b..04b363b1 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -231,7 +231,7 @@ async def increment_history_view( # l3 = ds.viz.shm.array[-3:] # print( - # f'fast step for {ds.flume.symbol.fqme}:\n' + # f'fast step for {ds.flume.mkt.fqme}:\n' # f'{list(l3["time"])}\n' # f'{l3}\n' # ) @@ -319,7 +319,7 @@ async def graphics_update_loop( for fqme, flume in feed.flumes.items(): ohlcv = flume.rt_shm hist_ohlcv = flume.hist_shm - symbol = flume.symbol + symbol = flume.mkt fqme = symbol.fqme # update last price sticky @@ -360,7 +360,7 @@ async def graphics_update_loop( last, volume = ohlcv.array[-1][['close', 'volume']] - symbol = flume.symbol + symbol = flume.mkt l1 = L1Labels( fast_pi, @@ -1007,7 +1007,7 @@ async def link_views_with_region( hist_pi.addItem(region, ignoreBounds=True) region.setOpacity(6/16) - viz = rt_chart.get_viz(flume.symbol.fqme) + viz = rt_chart.get_viz(flume.mkt.fqme) assert viz index_field = viz.index_field @@ -1034,7 +1034,7 @@ async def link_views_with_region( # HFT/real-time chart. rng = mn, mx = viewRange[0] - # hist_viz = hist_chart.get_viz(flume.symbol.fqme) + # hist_viz = hist_chart.get_viz(flume.mkt.fqme) # hist = hist_viz.shm.array[-3:] # print( # f'mn: {mn}\n' @@ -1279,13 +1279,13 @@ async def display_symbol_data( # TODO NOTE: THIS CONTROLS WHAT SYMBOL IS USED FOR ORDER MODE # SUBMISSIONS, we need to make this switch based on selection. - rt_linked._symbol = flume.symbol - hist_linked._symbol = flume.symbol + rt_linked._symbol = flume.mkt + hist_linked._symbol = flume.mkt ohlcv: ShmArray = flume.rt_shm hist_ohlcv: ShmArray = flume.hist_shm - symbol = flume.symbol + symbol = flume.mkt fqme = symbol.fqme hist_chart = hist_linked.plot_ohlc_main( @@ -1378,7 +1378,7 @@ async def display_symbol_data( ohlcv: ShmArray = flume.rt_shm hist_ohlcv: ShmArray = flume.hist_shm - symbol = flume.symbol + symbol = flume.mkt fqme = symbol.fqme hist_pi = hist_chart.overlay_plotitem( diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 6435e970..f942ff14 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -46,7 +46,7 @@ from ..data._sharedmem import ( try_read, ) from ..data.feed import Flume -from ..accounting._mktinfo import Symbol +from ..accounting import MktPair from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -476,7 +476,8 @@ class FspAdmin: ) -> (Flume, trio.Event): - fqme = self.flume.symbol.get_fqme(delim_char='') + src_mkt: MktPair = self.flume.mkt + fqme: str = src_mkt.get_fqme(delim_char='') # allocate an output shm array key, dst_shm, opened = maybe_mk_fsp_shm( @@ -488,14 +489,27 @@ class FspAdmin: portal = self.cluster.get(worker_name) or self.rr_next_portal() provider_tag = portal.channel.uid - symbol = Symbol( - key=key, - broker_info={ - provider_tag: {'asset_type': 'fsp'}, - }, + # TODO: this should probably be turned into a + # ``Cascade`` type which describes the routing + # of an fsp's IO in terms of sinc -> source + # shm/IPC endpoints? + mkt = MktPair( + + # make this a couple addrs encapsing + # the flume routing? + src=src_mkt.dst, + dst=target.name, + + # make this a precision / rounding value? + price_tick=src_mkt.price_tick, + size_tick=src_mkt.size_tick, + + bs_mktid=target.name, + broker='piker', + _atype='fsp', ) dst_fsp_flume = Flume( - mkt=symbol, + mkt=mkt, _rt_shm_token=dst_shm.token, first_quote={}, diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 7ec859da..59ab434d 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -91,7 +91,7 @@ async def update_pnl_from_feed( pp: PositionTracker = order_mode.current_pp live: Position = pp.live_pp - key: str = live.symbol.fqme + key: str = live.mkt.fqme log.info(f'Starting pnl display for {pp.alloc.account}') @@ -862,7 +862,7 @@ class PositionTracker: alloc = self.alloc # update allocator settings - asset_type = pp.symbol.type_key + asset_type = pp.mkt.type_key # specific configs by asset class / type if asset_type in _derivs: diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 098bf14f..44558251 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -771,7 +771,7 @@ async def open_order_mode( # net-zero pp startup_pp = Position( - symbol=symbol, + mkt=symbol, size=0, ppu=0,