Port everything strictly to `Position.mkt` and `Flume.mkt`

master
Tyler Goodlet 2023-05-24 12:16:17 -04:00
parent 9a063ccb11
commit bd8e4760d5
15 changed files with 80 additions and 113 deletions

View File

@ -203,7 +203,7 @@ class Allocator(Struct):
# compute a fractional slots size to display # compute a fractional slots size to display
slots_used = self.slots_used( slots_used = self.slots_used(
Position( Position(
symbol=sym, mkt=sym,
size=order_size, size=order_size,
ppu=price, ppu=price,
bs_mktid=sym, bs_mktid=sym,

View File

@ -295,6 +295,11 @@ class MktPair(Struct, frozen=True):
dst=dst, dst=dst,
src=src, src=src,
**msg, **msg,
# XXX NOTE: ``msgspec`` can encode `Decimal`
# but it doesn't decide to it by default since
# we aren't spec-cing these msgs as structs, SO
# we have to ensure we do a struct type case (which `.copy()`
# does) to ensure we get the right type!
).copy() ).copy()
@property @property

View File

@ -79,11 +79,7 @@ class Position(Struct):
file system (in TOML) and to interchange as a msg over IPC. file system (in TOML) and to interchange as a msg over IPC.
''' '''
symbol: Symbol | MktPair mkt: MktPair
@property
def mkt(self) -> MktPair:
return self.symbol
# can be +ve or -ve for long/short # can be +ve or -ve for long/short
size: float size: float
@ -143,29 +139,12 @@ class Position(Struct):
# listing venue here even when the backend isn't providing # listing venue here even when the backend isn't providing
# it via the trades ledger.. # it via the trades ledger..
# drop symbol obj in serialized form # drop symbol obj in serialized form
s = d.pop('symbol') mkt: MktPair = d.pop('mkt')
fqme = s.fqme
broker, mktep, venue, suffix = unpack_fqme(fqme)
if isinstance(s, Symbol):
sym_info = s.broker_info[broker]
d['asset_type'] = sym_info['asset_type']
d['price_tick'] = (
sym_info.get('price_tick_size')
or
s.tick_size
)
d['size_tick'] = (
sym_info.get('lot_tick_size')
or
s.lot_tick_size
)
# the newwww wayyy B)
else:
mkt = s
assert isinstance(mkt, MktPair) assert isinstance(mkt, MktPair)
fqme = mkt.fqme
broker, mktep, venue, suffix = unpack_fqme(fqme)
# an asset resolved mkt where we have ``Asset`` info about # an asset resolved mkt where we have ``Asset`` info about
# each tradeable asset in the market. # each tradeable asset in the market.
if mkt.resolved: if mkt.resolved:
@ -267,12 +246,12 @@ class Position(Struct):
) -> None: ) -> None:
# XXX: better place to do this? # XXX: better place to do this?
symbol = self.symbol mkt = self.mkt
# TODO: switch to new fields..? # TODO: switch to new fields..?
# .size_tick_digits, .price_tick_digits # .size_tick_digits, .price_tick_digits
size_tick_digits = symbol.lot_size_digits size_tick_digits = mkt.lot_size_digits
price_tick_digits = symbol.tick_size_digits price_tick_digits = mkt.tick_size_digits
self.ppu = round( self.ppu = round(
# TODO: change this to ppu? # TODO: change this to ppu?
@ -470,7 +449,7 @@ class Position(Struct):
size = round(size * self.split_ratio) size = round(size * self.split_ratio)
return float( return float(
self.symbol.quantize(size), self.mkt.quantize(size),
) )
def minimize_clears( def minimize_clears(
@ -571,7 +550,7 @@ class PpTable(Struct):
if not pp: if not pp:
# if no existing pp, allocate fresh one. # if no existing pp, allocate fresh one.
pp = pps[bs_mktid] = Position( pp = pps[bs_mktid] = Position(
mkt, mkt=mkt,
size=0.0, size=0.0,
ppu=0.0, ppu=0.0,
bs_mktid=bs_mktid, bs_mktid=bs_mktid,
@ -583,8 +562,8 @@ class PpTable(Struct):
# a shorter string), instead use the one from the # a shorter string), instead use the one from the
# transaction since it likely has (more) full # transaction since it likely has (more) full
# information from the provider. # information from the provider.
if len(pp.symbol.fqme) < len(fqme): if len(pp.mkt.fqme) < len(fqme):
pp.symbol = mkt pp.mkt = mkt
clears = pp.clears clears = pp.clears
if clears: if clears:
@ -735,7 +714,7 @@ class PpTable(Struct):
if closed: if closed:
bs_mktid: str bs_mktid: str
for bs_mktid, pos in closed.items(): for bs_mktid, pos in closed.items():
fqme: str = pos.symbol.fqme fqme: str = pos.mkt.fqme
if fqme in self.conf: if fqme in self.conf:
self.conf.pop(fqme) self.conf.pop(fqme)
else: else:

View File

@ -345,6 +345,7 @@ async def update_and_audit_msgs(
) -> list[BrokerdPosition]: ) -> list[BrokerdPosition]:
msgs: list[BrokerdPosition] = [] msgs: list[BrokerdPosition] = []
p: Position
for p in pps: for p in pps:
bs_mktid = p.bs_mktid bs_mktid = p.bs_mktid
@ -427,7 +428,7 @@ async def update_and_audit_msgs(
# right since `.broker` is already included? # right since `.broker` is already included?
account=f'ib.{acctid}', account=f'ib.{acctid}',
# XXX: the `.ib` is stripped..? # XXX: the `.ib` is stripped..?
symbol=p.symbol.fqme, symbol=p.mkt.fqme,
# currency=ibppmsg.currency, # currency=ibppmsg.currency,
size=p.size, size=p.size,
avg_price=p.ppu, avg_price=p.ppu,

View File

@ -1281,7 +1281,7 @@ async def process_client_order_cmds(
# TODO: make this configurable from our top level # TODO: make this configurable from our top level
# config, prolly in a .clearing` section? # config, prolly in a .clearing` section?
spread_slap: float = 5 spread_slap: float = 5
min_tick = float(flume.symbol.size_tick) min_tick = float(flume.mkt.size_tick)
min_tick_digits = float_digits(min_tick) min_tick_digits = float_digits(min_tick)
if action == 'buy': if action == 'buy':

View File

@ -611,7 +611,7 @@ async def trades_dialogue(
pp_msgs.append(BrokerdPosition( pp_msgs.append(BrokerdPosition(
broker=broker, broker=broker,
account='paper', account='paper',
symbol=pos.symbol.fqme, symbol=pos.mkt.fqme,
size=pos.size, size=pos.size,
avg_price=pos.ppu, avg_price=pos.ppu,
)) ))

View File

@ -22,7 +22,6 @@ real-time data processing data-structures.
""" """
from __future__ import annotations from __future__ import annotations
# from decimal import Decimal
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -31,13 +30,8 @@ import tractor
import pendulum import pendulum
import numpy as np import numpy as np
from ..accounting._mktinfo import ( from ..accounting import MktPair
MktPair, from ._util import log
Symbol,
)
from ._util import (
log,
)
from .types import Struct from .types import Struct
from ._sharedmem import ( from ._sharedmem import (
attach_shm_array, attach_shm_array,
@ -94,18 +88,10 @@ class Flume(Struct):
queuing properties. queuing properties.
''' '''
mkt: MktPair | Symbol mkt: MktPair
first_quote: dict first_quote: dict
_rt_shm_token: _Token _rt_shm_token: _Token
@property
def symbol(self) -> MktPair | Symbol:
log.warning(
'`Flume.symbol` is deprecated!\n'
'Use `.mkt: MktPair` instead!'
)
return self.mkt
# optional since some data flows won't have a "downsampled" history # optional since some data flows won't have a "downsampled" history
# buffer/stream (eg. FSPs). # buffer/stream (eg. FSPs).
_hist_shm_token: _Token | None = None _hist_shm_token: _Token | None = None
@ -208,18 +194,7 @@ class Flume(Struct):
''' '''
mkt_msg = msg.pop('mkt') mkt_msg = msg.pop('mkt')
if 'dst' in mkt_msg:
mkt = MktPair.from_msg(mkt_msg) mkt = MktPair.from_msg(mkt_msg)
else:
# XXX NOTE: ``msgspec`` can encode `Decimal`
# but it doesn't decide to it by default since
# we aren't spec-cing these msgs as structs, SO
# we have to ensure we do a struct type case (which `.copy()`
# does) to ensure we get the right type!
mkt = Symbol(**mkt_msg).copy()
return cls(mkt=mkt, **msg) return cls(mkt=mkt, **msg)
def get_index( def get_index(

View File

@ -648,7 +648,7 @@ async def manage_history(
# (maybe) allocate shm array for this broker/symbol which will # (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing. # be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array( hist_shm, opened = maybe_open_shm_array(
key=f'piker.{service}[{uuid[:16]}.{fqme}.hist', key=f'piker.{service}[{uuid[:16]}].{fqme}.hist',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
@ -665,7 +665,7 @@ async def manage_history(
) )
rt_shm, opened = maybe_open_shm_array( rt_shm, opened = maybe_open_shm_array(
key=f'piker.{service}[{uuid[:16]}.{fqme}.rt', key=f'piker.{service}[{uuid[:16]}].{fqme}.rt',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),

View File

@ -174,16 +174,6 @@ def fsp(
return Fsp(wrapped, outputs=(wrapped.__name__,)) return Fsp(wrapped, outputs=(wrapped.__name__,))
def mk_fsp_shm_key(
sym: str,
target: Fsp
) -> str:
actor_name, uuid = tractor.current_actor().uid
uuid_snip: str = uuid[:16]
return f'piker.{actor_name}[{uuid_snip}].{sym}.{target.name}'
def maybe_mk_fsp_shm( def maybe_mk_fsp_shm(
sym: str, sym: str,
target: Fsp, target: Fsp,
@ -207,7 +197,10 @@ def maybe_mk_fsp_shm(
[(field_name, float) for field_name in target.outputs] [(field_name, float) for field_name in target.outputs]
) )
key = mk_fsp_shm_key(sym, target) # (attempt to) uniquely key the fsp shm buffers
actor_name, uuid = tractor.current_actor().uid
uuid_snip: str = uuid[:16]
key: str = f'piker.{actor_name}[{uuid_snip}].{sym}.{target.name}'
shm, opened = maybe_open_shm_array( shm, opened = maybe_open_shm_array(
key, key,

View File

@ -45,7 +45,7 @@ from ..data._sampling import (
_default_delay_s, _default_delay_s,
open_sample_stream, open_sample_stream,
) )
from ..accounting._mktinfo import Symbol from ..accounting import MktPair
from ._api import ( from ._api import (
Fsp, Fsp,
_load_builtins, _load_builtins,
@ -85,7 +85,7 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
symbol: Symbol, mkt: MktPair,
flume: Flume, flume: Flume,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -104,7 +104,7 @@ async def fsp_compute(
disabled=True disabled=True
) )
fqme = symbol.fqme fqme = mkt.fqme
out_stream = func( out_stream = func(
# TODO: do we even need this if we do the feed api right? # TODO: do we even need this if we do the feed api right?
@ -340,7 +340,7 @@ async def cascade(
) as feed: ) as feed:
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
symbol = flume.symbol mkt = flume.mkt
assert src.token == flume.rt_shm.token assert src.token == flume.rt_shm.token
profiler(f'{func}: feed up') profiler(f'{func}: feed up')
@ -352,7 +352,7 @@ async def cascade(
fsp_target = partial( fsp_target = partial(
fsp_compute, fsp_compute,
symbol=symbol, mkt=mkt,
flume=flume, flume=flume,
quote_stream=flume.stream, quote_stream=flume.stream,

View File

@ -762,7 +762,7 @@ async def open_tsdb_client(
if fqme: if fqme:
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
symbol = flume.symbol symbol = flume.mkt
if symbol: if symbol:
fqme = symbol.fqme fqme = symbol.fqme

View File

@ -231,7 +231,7 @@ async def increment_history_view(
# l3 = ds.viz.shm.array[-3:] # l3 = ds.viz.shm.array[-3:]
# print( # print(
# f'fast step for {ds.flume.symbol.fqme}:\n' # f'fast step for {ds.flume.mkt.fqme}:\n'
# f'{list(l3["time"])}\n' # f'{list(l3["time"])}\n'
# f'{l3}\n' # f'{l3}\n'
# ) # )
@ -319,7 +319,7 @@ async def graphics_update_loop(
for fqme, flume in feed.flumes.items(): for fqme, flume in feed.flumes.items():
ohlcv = flume.rt_shm ohlcv = flume.rt_shm
hist_ohlcv = flume.hist_shm hist_ohlcv = flume.hist_shm
symbol = flume.symbol symbol = flume.mkt
fqme = symbol.fqme fqme = symbol.fqme
# update last price sticky # update last price sticky
@ -360,7 +360,7 @@ async def graphics_update_loop(
last, volume = ohlcv.array[-1][['close', 'volume']] last, volume = ohlcv.array[-1][['close', 'volume']]
symbol = flume.symbol symbol = flume.mkt
l1 = L1Labels( l1 = L1Labels(
fast_pi, fast_pi,
@ -1007,7 +1007,7 @@ async def link_views_with_region(
hist_pi.addItem(region, ignoreBounds=True) hist_pi.addItem(region, ignoreBounds=True)
region.setOpacity(6/16) region.setOpacity(6/16)
viz = rt_chart.get_viz(flume.symbol.fqme) viz = rt_chart.get_viz(flume.mkt.fqme)
assert viz assert viz
index_field = viz.index_field index_field = viz.index_field
@ -1034,7 +1034,7 @@ async def link_views_with_region(
# HFT/real-time chart. # HFT/real-time chart.
rng = mn, mx = viewRange[0] rng = mn, mx = viewRange[0]
# hist_viz = hist_chart.get_viz(flume.symbol.fqme) # hist_viz = hist_chart.get_viz(flume.mkt.fqme)
# hist = hist_viz.shm.array[-3:] # hist = hist_viz.shm.array[-3:]
# print( # print(
# f'mn: {mn}\n' # f'mn: {mn}\n'
@ -1279,13 +1279,13 @@ async def display_symbol_data(
# TODO NOTE: THIS CONTROLS WHAT SYMBOL IS USED FOR ORDER MODE # TODO NOTE: THIS CONTROLS WHAT SYMBOL IS USED FOR ORDER MODE
# SUBMISSIONS, we need to make this switch based on selection. # SUBMISSIONS, we need to make this switch based on selection.
rt_linked._symbol = flume.symbol rt_linked._symbol = flume.mkt
hist_linked._symbol = flume.symbol hist_linked._symbol = flume.mkt
ohlcv: ShmArray = flume.rt_shm ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm hist_ohlcv: ShmArray = flume.hist_shm
symbol = flume.symbol symbol = flume.mkt
fqme = symbol.fqme fqme = symbol.fqme
hist_chart = hist_linked.plot_ohlc_main( hist_chart = hist_linked.plot_ohlc_main(
@ -1378,7 +1378,7 @@ async def display_symbol_data(
ohlcv: ShmArray = flume.rt_shm ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm hist_ohlcv: ShmArray = flume.hist_shm
symbol = flume.symbol symbol = flume.mkt
fqme = symbol.fqme fqme = symbol.fqme
hist_pi = hist_chart.overlay_plotitem( hist_pi = hist_chart.overlay_plotitem(

View File

@ -46,7 +46,7 @@ from ..data._sharedmem import (
try_read, try_read,
) )
from ..data.feed import Flume from ..data.feed import Flume
from ..accounting._mktinfo import Symbol from ..accounting import MktPair
from ._chart import ( from ._chart import (
ChartPlotWidget, ChartPlotWidget,
LinkedSplits, LinkedSplits,
@ -476,7 +476,8 @@ class FspAdmin:
) -> (Flume, trio.Event): ) -> (Flume, trio.Event):
fqme = self.flume.symbol.get_fqme(delim_char='') src_mkt: MktPair = self.flume.mkt
fqme: str = src_mkt.get_fqme(delim_char='')
# allocate an output shm array # allocate an output shm array
key, dst_shm, opened = maybe_mk_fsp_shm( key, dst_shm, opened = maybe_mk_fsp_shm(
@ -488,14 +489,27 @@ class FspAdmin:
portal = self.cluster.get(worker_name) or self.rr_next_portal() portal = self.cluster.get(worker_name) or self.rr_next_portal()
provider_tag = portal.channel.uid provider_tag = portal.channel.uid
symbol = Symbol( # TODO: this should probably be turned into a
key=key, # ``Cascade`` type which describes the routing
broker_info={ # of an fsp's IO in terms of sinc -> source
provider_tag: {'asset_type': 'fsp'}, # shm/IPC endpoints?
}, mkt = MktPair(
# make this a couple addrs encapsing
# the flume routing?
src=src_mkt.dst,
dst=target.name,
# make this a precision / rounding value?
price_tick=src_mkt.price_tick,
size_tick=src_mkt.size_tick,
bs_mktid=target.name,
broker='piker',
_atype='fsp',
) )
dst_fsp_flume = Flume( dst_fsp_flume = Flume(
mkt=symbol, mkt=mkt,
_rt_shm_token=dst_shm.token, _rt_shm_token=dst_shm.token,
first_quote={}, first_quote={},

View File

@ -91,7 +91,7 @@ async def update_pnl_from_feed(
pp: PositionTracker = order_mode.current_pp pp: PositionTracker = order_mode.current_pp
live: Position = pp.live_pp live: Position = pp.live_pp
key: str = live.symbol.fqme key: str = live.mkt.fqme
log.info(f'Starting pnl display for {pp.alloc.account}') log.info(f'Starting pnl display for {pp.alloc.account}')
@ -862,7 +862,7 @@ class PositionTracker:
alloc = self.alloc alloc = self.alloc
# update allocator settings # update allocator settings
asset_type = pp.symbol.type_key asset_type = pp.mkt.type_key
# specific configs by asset class / type # specific configs by asset class / type
if asset_type in _derivs: if asset_type in _derivs:

View File

@ -771,7 +771,7 @@ async def open_order_mode(
# net-zero pp # net-zero pp
startup_pp = Position( startup_pp = Position(
symbol=symbol, mkt=symbol,
size=0, size=0,
ppu=0, ppu=0,