From 2c4daf08e0f7e60514fc1e3add57d82da34f56af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Nov 2022 22:30:15 -0500 Subject: [PATCH] Adjust to per-fqsn-oriented `Flume` lookups throughout --- piker/clearing/_ems.py | 23 ++++++++++------- piker/clearing/_paper_engine.py | 2 +- piker/fsp/_engine.py | 23 ++++++++--------- piker/ui/_chart.py | 6 +++-- piker/ui/_display.py | 44 +++++++++++++++++++-------------- piker/ui/_position.py | 15 ++++++----- piker/ui/order_mode.py | 14 ++++++----- 7 files changed, 73 insertions(+), 54 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 2147369b..b478abcc 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -48,6 +48,7 @@ from ..data._source import ( ) from ..data.feed import ( Feed, + Flume, maybe_open_feed, ) from ..ui._notify import notify_from_ems_status_msg @@ -523,13 +524,14 @@ class Router(Struct): maybe_open_feed( [fqsn], loglevel=loglevel, - ) as (feed, quote_stream), + ) as feed, ): brokermod = feed.mod broker = brokermod.name # XXX: this should be initial price quote from target provider - first_quote: dict = feed.first_quotes[fqsn] + flume = feed.flumes[fqsn] + first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) book.lasts[fqsn]: float = first_quote['last'] @@ -547,14 +549,16 @@ class Router(Struct): clear_dark_triggers, self, relay.brokerd_stream, - quote_stream, + flume.stream, broker, fqsn, # form: ... book ) client_ready = trio.Event() - task_status.started((relay, feed, client_ready)) + task_status.started( + (relay, feed, client_ready) + ) # sync to the client side by waiting for the stream # connection setup before relaying any existing live @@ -1014,7 +1018,7 @@ async def process_client_order_cmds( brokerd_order_stream: tractor.MsgStream, fqsn: str, - feed: Feed, + flume: Flume, dark_book: DarkBook, router: Router, @@ -1212,7 +1216,7 @@ async def process_client_order_cmds( 'size': size, 'exec_mode': exec_mode, 'action': action, - 'brokers': brokers, # list + 'brokers': _, # list } if ( # "DARK" triggers # submit order to local EMS book and scan loop, @@ -1234,12 +1238,12 @@ async def process_client_order_cmds( # sometimes the real-time feed hasn't come up # so just pull from the latest history. if isnan(last): - last = feed.rt_shm.array[-1]['close'] + last = flume.rt_shm.array[-1]['close'] pred = mk_check(trigger_price, last, action) spread_slap: float = 5 - min_tick = feed.symbols[fqsn].tick_size + min_tick = flume.symbol.tick_size if action == 'buy': tickfilter = ('ask', 'last', 'trade') @@ -1452,11 +1456,12 @@ async def _emsd_main( # start inbound (from attached client) order request processing # main entrypoint, run here until cancelled. try: + flume = feed.flumes[fqsn] await process_client_order_cmds( client_stream, brokerd_stream, fqsn, - feed, + flume, dark_book, _router, ) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 211a29fc..33ca5761 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -578,7 +578,7 @@ async def trades_dialogue( ) # paper engine simulator clearing task - await simulate_fills(feed.stream, client) + await simulate_fills(feed.streams[broker], client) @asynccontextmanager diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 084ff510..eb5eaff4 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -26,7 +26,6 @@ from typing import ( ) import numpy as np -import pyqtgraph as pg import trio from trio_typing import TaskStatus import tractor @@ -35,7 +34,9 @@ from tractor.msg import NamespacePath from ..log import get_logger, get_console_log from .. import data from ..data import attach_shm_array -from ..data.feed import Feed +from ..data.feed import ( + Flume, +) from ..data._sharedmem import ShmArray from ..data._sampling import _default_delay_s from ..data._source import Symbol @@ -79,7 +80,7 @@ async def filter_quotes_by_sym( async def fsp_compute( symbol: Symbol, - feed: Feed, + flume: Flume, quote_stream: trio.abc.ReceiveChannel, src: ShmArray, @@ -107,7 +108,7 @@ async def fsp_compute( filter_quotes_by_sym(fqsn, quote_stream), # XXX: currently the ``ohlcv`` arg - feed.rt_shm, + flume.rt_shm, ) # Conduct a single iteration of fsp with historical bars input @@ -310,12 +311,12 @@ async def cascade( # needs to get throttled the ticks we generate. # tick_throttle=60, - ) as (feed, quote_stream): - symbol = feed.symbols[fqsn] + ) as feed: + flume = feed.flumes[fqsn] + symbol = flume.symbol + assert src.token == flume.rt_shm.token profiler(f'{func}: feed up') - - assert src.token == feed.rt_shm.token # last_len = new_len = len(src.array) func_name = func.__name__ @@ -327,8 +328,8 @@ async def cascade( fsp_compute, symbol=symbol, - feed=feed, - quote_stream=quote_stream, + flume=flume, + quote_stream=flume.stream, # shm src=src, @@ -430,7 +431,7 @@ async def cascade( # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async with feed.index_stream( + async with flume.index_stream( int(delay_s) ) as istream: diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index f61ed1d7..93b41095 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -915,14 +915,16 @@ class ChartPlotWidget(pg.PlotWidget): def resume_all_feeds(self): try: for feed in self._feeds.values(): - self.linked.godwidget._root_n.start_soon(feed.resume) + for flume in feed.flumes.values(): + self.linked.godwidget._root_n.start_soon(flume.resume) except RuntimeError: # TODO: cancel the qtractor runtime here? raise def pause_all_feeds(self): for feed in self._feeds.values(): - self.linked.godwidget._root_n.start_soon(feed.pause) + for flume in feed.flumes.values(): + self.linked.godwidget._root_n.start_soon(flume.pause) @property def view(self) -> ChartView: diff --git a/piker/ui/_display.py b/piker/ui/_display.py index af4f8361..c7ed9299 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -33,6 +33,7 @@ import pyqtgraph as pg from ..data.feed import ( open_feed, Feed, + Flume, ) from ..data.types import Struct from ._axes import YAxisLabel @@ -228,7 +229,7 @@ async def graphics_update_loop( nurse: trio.Nursery, godwidget: GodWidget, - feed: Feed, + flume: Flume, wap_in_history: bool = False, vlm_chart: Optional[ChartPlotWidget] = None, @@ -255,8 +256,8 @@ async def graphics_update_loop( fast_chart = linked.chart hist_chart = godwidget.hist_linked.chart - ohlcv = feed.rt_shm - hist_ohlcv = feed.hist_shm + ohlcv = flume.rt_shm + hist_ohlcv = flume.hist_shm # update last price sticky last_price_sticky = fast_chart._ysticks[fast_chart.name] @@ -347,9 +348,9 @@ async def graphics_update_loop( 'i_last_append': i_last, 'i_last': i_last, } - _, hist_step_size_s, _ = feed.get_ds_info() + _, hist_step_size_s, _ = flume.get_ds_info() - async with feed.index_stream( + async with flume.index_stream( # int(hist_step_size_s) # TODO: seems this is more reliable at keeping the slow # chart incremented in view more correctly? @@ -393,7 +394,7 @@ async def graphics_update_loop( nurse.start_soon(increment_history_view) # main real-time quotes update loop - stream: tractor.MsgStream = feed.stream + stream: tractor.MsgStream = flume.stream async for quotes in stream: ds.quotes = quotes @@ -813,13 +814,13 @@ def graphics_update_cycle( async def link_views_with_region( rt_chart: ChartPlotWidget, hist_chart: ChartPlotWidget, - feed: Feed, + flume: Flume, ) -> None: # these value are be only pulled once during shm init/startup - izero_hist = feed.izero_hist - izero_rt = feed.izero_rt + izero_hist = flume.izero_hist + izero_rt = flume.izero_rt # Add the LinearRegionItem to the ViewBox, but tell the ViewBox # to exclude this item when doing auto-range calculations. @@ -846,7 +847,7 @@ async def link_views_with_region( # poll for datums load and timestep detection for _ in range(100): try: - _, _, ratio = feed.get_ds_info() + _, _, ratio = flume.get_ds_info() break except IndexError: await trio.sleep(0.01) @@ -977,8 +978,7 @@ async def display_symbol_data( group_key=True ) - first_fqsn = fqsns[0] - + feed: Feed async with open_feed( fqsns, loglevel=loglevel, @@ -988,11 +988,17 @@ async def display_symbol_data( tick_throttle=_quote_throttle_rate, ) as feed: - ohlcv: ShmArray = feed.rt_shm - hist_ohlcv: ShmArray = feed.hist_shm - symbol = feed.symbols[first_fqsn] - fqsn = symbol.front_fqsn() + # TODO: right now we only show one symbol on charts, but + # overlays are coming muy pronto guey.. + assert len(feed.flumes) == 1 + flume = list(feed.flumes.values())[0] + + ohlcv: ShmArray = flume.rt_shm + hist_ohlcv: ShmArray = flume.hist_shm + + symbol = flume.symbol + fqsn = symbol.fqsn step_size_s = 1 tf_key = tf_in_1s[step_size_s] @@ -1012,7 +1018,7 @@ async def display_symbol_data( hist_linked._symbol = symbol hist_chart = hist_linked.plot_ohlc_main( symbol, - feed.hist_shm, + hist_ohlcv, # in the case of history chart we explicitly set `False` # to avoid internal pane creation. # sidepane=False, @@ -1100,7 +1106,7 @@ async def display_symbol_data( graphics_update_loop, ln, godwidget, - feed, + flume, wap_in_history, vlm_chart, ) @@ -1124,7 +1130,7 @@ async def display_symbol_data( await link_views_with_region( ohlc_chart, hist_chart, - feed, + flume, ) mode: OrderMode diff --git a/piker/ui/_position.py b/piker/ui/_position.py index f2ec1466..98584161 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -45,7 +45,10 @@ from ..calc import humanize, pnl, puterize from ..clearing._allocate import Allocator from ..pp import Position from ..data._normalize import iterticks -from ..data.feed import Feed +from ..data.feed import ( + Feed, + Flume, +) from ..data.types import Struct from ._label import Label from ._lines import LevelLine, order_line @@ -64,7 +67,7 @@ _pnl_tasks: dict[str, bool] = {} async def update_pnl_from_feed( - feed: Feed, + flume: Flume, order_mode: OrderMode, # noqa tracker: PositionTracker, @@ -95,7 +98,7 @@ async def update_pnl_from_feed( # real-time update pnl on the status pane try: - async with feed.stream.subscribe() as bstream: + async with flume.stream.subscribe() as bstream: # last_tick = time.time() async for quotes in bstream: @@ -390,12 +393,12 @@ class SettingsPane: mode = self.order_mode sym = mode.chart.linked.symbol size = tracker.live_pp.size - feed = mode.quote_feed + flume: Feed = mode.feed.flumes[sym.fqsn] pnl_value = 0 if size: # last historical close price - last = feed.rt_shm.array[-1][['close']][0] + last = flume.rt_shm.array[-1][['close']][0] pnl_value = copysign(1, size) * pnl( tracker.live_pp.ppu, last, @@ -408,7 +411,7 @@ class SettingsPane: _pnl_tasks[fqsn] = True self.order_mode.nursery.start_soon( update_pnl_from_feed, - feed, + flume, mode, tracker, ) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index fa8ecbce..7e4ae066 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -44,7 +44,10 @@ from ..clearing._allocate import ( ) from ._style import _font from ..data._source import Symbol -from ..data.feed import Feed +from ..data.feed import ( + Feed, + Flume, +) from ..data.types import Struct from ..log import get_logger from ._editors import LineEditor, ArrowEditor @@ -118,7 +121,6 @@ class OrderMode: chart: ChartPlotWidget # type: ignore # noqa hist_chart: ChartPlotWidget # type: ignore # noqa nursery: trio.Nursery # used by ``ui._position`` code? - quote_feed: Feed book: OrderBook lines: LineEditor arrows: ArrowEditor @@ -514,12 +516,13 @@ class OrderMode: # XXX: seems to fail on certain types of races? # assert len(lines) == 2 if lines: - _, _, ratio = self.feed.get_ds_info() + flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn] + _, _, ratio = flume.get_ds_info() for i, chart in [ (arrow_index, self.chart), - (self.feed.izero_hist + (flume.izero_hist + - round((arrow_index - self.feed.izero_rt)/ratio), + round((arrow_index - flume.izero_rt)/ratio), self.hist_chart) ]: self.arrows.add( @@ -801,7 +804,6 @@ async def open_order_mode( chart, hist_chart, tn, - feed, book, lines, arrows,