Adjust to per-fqsn-oriented `Flume` lookups throughout

agg_feedz
Tyler Goodlet 2022-11-09 22:30:15 -05:00
parent 7daab6329d
commit 2c4daf08e0
7 changed files with 73 additions and 54 deletions

View File

@ -48,6 +48,7 @@ from ..data._source import (
) )
from ..data.feed import ( from ..data.feed import (
Feed, Feed,
Flume,
maybe_open_feed, maybe_open_feed,
) )
from ..ui._notify import notify_from_ems_status_msg from ..ui._notify import notify_from_ems_status_msg
@ -523,13 +524,14 @@ class Router(Struct):
maybe_open_feed( maybe_open_feed(
[fqsn], [fqsn],
loglevel=loglevel, loglevel=loglevel,
) as (feed, quote_stream), ) as feed,
): ):
brokermod = feed.mod brokermod = feed.mod
broker = brokermod.name broker = brokermod.name
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
first_quote: dict = feed.first_quotes[fqsn] flume = feed.flumes[fqsn]
first_quote: dict = flume.first_quote
book: DarkBook = self.get_dark_book(broker) book: DarkBook = self.get_dark_book(broker)
book.lasts[fqsn]: float = first_quote['last'] book.lasts[fqsn]: float = first_quote['last']
@ -547,14 +549,16 @@ class Router(Struct):
clear_dark_triggers, clear_dark_triggers,
self, self,
relay.brokerd_stream, relay.brokerd_stream,
quote_stream, flume.stream,
broker, broker,
fqsn, # form: <name>.<venue>.<suffix>.<broker> fqsn, # form: <name>.<venue>.<suffix>.<broker>
book book
) )
client_ready = trio.Event() client_ready = trio.Event()
task_status.started((relay, feed, client_ready)) task_status.started(
(relay, feed, client_ready)
)
# sync to the client side by waiting for the stream # sync to the client side by waiting for the stream
# connection setup before relaying any existing live # connection setup before relaying any existing live
@ -1014,7 +1018,7 @@ async def process_client_order_cmds(
brokerd_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream,
fqsn: str, fqsn: str,
feed: Feed, flume: Flume,
dark_book: DarkBook, dark_book: DarkBook,
router: Router, router: Router,
@ -1212,7 +1216,7 @@ async def process_client_order_cmds(
'size': size, 'size': size,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
'action': action, 'action': action,
'brokers': brokers, # list 'brokers': _, # list
} if ( } if (
# "DARK" triggers # "DARK" triggers
# submit order to local EMS book and scan loop, # submit order to local EMS book and scan loop,
@ -1234,12 +1238,12 @@ async def process_client_order_cmds(
# sometimes the real-time feed hasn't come up # sometimes the real-time feed hasn't come up
# so just pull from the latest history. # so just pull from the latest history.
if isnan(last): if isnan(last):
last = feed.rt_shm.array[-1]['close'] last = flume.rt_shm.array[-1]['close']
pred = mk_check(trigger_price, last, action) pred = mk_check(trigger_price, last, action)
spread_slap: float = 5 spread_slap: float = 5
min_tick = feed.symbols[fqsn].tick_size min_tick = flume.symbol.tick_size
if action == 'buy': if action == 'buy':
tickfilter = ('ask', 'last', 'trade') tickfilter = ('ask', 'last', 'trade')
@ -1452,11 +1456,12 @@ async def _emsd_main(
# start inbound (from attached client) order request processing # start inbound (from attached client) order request processing
# main entrypoint, run here until cancelled. # main entrypoint, run here until cancelled.
try: try:
flume = feed.flumes[fqsn]
await process_client_order_cmds( await process_client_order_cmds(
client_stream, client_stream,
brokerd_stream, brokerd_stream,
fqsn, fqsn,
feed, flume,
dark_book, dark_book,
_router, _router,
) )

View File

@ -578,7 +578,7 @@ async def trades_dialogue(
) )
# paper engine simulator clearing task # paper engine simulator clearing task
await simulate_fills(feed.stream, client) await simulate_fills(feed.streams[broker], client)
@asynccontextmanager @asynccontextmanager

View File

@ -26,7 +26,6 @@ from typing import (
) )
import numpy as np import numpy as np
import pyqtgraph as pg
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
@ -35,7 +34,9 @@ from tractor.msg import NamespacePath
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .. import data from .. import data
from ..data import attach_shm_array from ..data import attach_shm_array
from ..data.feed import Feed from ..data.feed import (
Flume,
)
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ..data._sampling import _default_delay_s from ..data._sampling import _default_delay_s
from ..data._source import Symbol from ..data._source import Symbol
@ -79,7 +80,7 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
symbol: Symbol, symbol: Symbol,
feed: Feed, flume: Flume,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
src: ShmArray, src: ShmArray,
@ -107,7 +108,7 @@ async def fsp_compute(
filter_quotes_by_sym(fqsn, quote_stream), filter_quotes_by_sym(fqsn, quote_stream),
# XXX: currently the ``ohlcv`` arg # XXX: currently the ``ohlcv`` arg
feed.rt_shm, flume.rt_shm,
) )
# Conduct a single iteration of fsp with historical bars input # Conduct a single iteration of fsp with historical bars input
@ -310,12 +311,12 @@ async def cascade(
# needs to get throttled the ticks we generate. # needs to get throttled the ticks we generate.
# tick_throttle=60, # tick_throttle=60,
) as (feed, quote_stream): ) as feed:
symbol = feed.symbols[fqsn]
flume = feed.flumes[fqsn]
symbol = flume.symbol
assert src.token == flume.rt_shm.token
profiler(f'{func}: feed up') profiler(f'{func}: feed up')
assert src.token == feed.rt_shm.token
# last_len = new_len = len(src.array) # last_len = new_len = len(src.array)
func_name = func.__name__ func_name = func.__name__
@ -327,8 +328,8 @@ async def cascade(
fsp_compute, fsp_compute,
symbol=symbol, symbol=symbol,
feed=feed, flume=flume,
quote_stream=quote_stream, quote_stream=flume.stream,
# shm # shm
src=src, src=src,
@ -430,7 +431,7 @@ async def cascade(
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream( async with flume.index_stream(
int(delay_s) int(delay_s)
) as istream: ) as istream:

View File

@ -915,14 +915,16 @@ class ChartPlotWidget(pg.PlotWidget):
def resume_all_feeds(self): def resume_all_feeds(self):
try: try:
for feed in self._feeds.values(): for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.resume) for flume in feed.flumes.values():
self.linked.godwidget._root_n.start_soon(flume.resume)
except RuntimeError: except RuntimeError:
# TODO: cancel the qtractor runtime here? # TODO: cancel the qtractor runtime here?
raise raise
def pause_all_feeds(self): def pause_all_feeds(self):
for feed in self._feeds.values(): for feed in self._feeds.values():
self.linked.godwidget._root_n.start_soon(feed.pause) for flume in feed.flumes.values():
self.linked.godwidget._root_n.start_soon(flume.pause)
@property @property
def view(self) -> ChartView: def view(self) -> ChartView:

View File

@ -33,6 +33,7 @@ import pyqtgraph as pg
from ..data.feed import ( from ..data.feed import (
open_feed, open_feed,
Feed, Feed,
Flume,
) )
from ..data.types import Struct from ..data.types import Struct
from ._axes import YAxisLabel from ._axes import YAxisLabel
@ -228,7 +229,7 @@ async def graphics_update_loop(
nurse: trio.Nursery, nurse: trio.Nursery,
godwidget: GodWidget, godwidget: GodWidget,
feed: Feed, flume: Flume,
wap_in_history: bool = False, wap_in_history: bool = False,
vlm_chart: Optional[ChartPlotWidget] = None, vlm_chart: Optional[ChartPlotWidget] = None,
@ -255,8 +256,8 @@ async def graphics_update_loop(
fast_chart = linked.chart fast_chart = linked.chart
hist_chart = godwidget.hist_linked.chart hist_chart = godwidget.hist_linked.chart
ohlcv = feed.rt_shm ohlcv = flume.rt_shm
hist_ohlcv = feed.hist_shm hist_ohlcv = flume.hist_shm
# update last price sticky # update last price sticky
last_price_sticky = fast_chart._ysticks[fast_chart.name] last_price_sticky = fast_chart._ysticks[fast_chart.name]
@ -347,9 +348,9 @@ async def graphics_update_loop(
'i_last_append': i_last, 'i_last_append': i_last,
'i_last': i_last, 'i_last': i_last,
} }
_, hist_step_size_s, _ = feed.get_ds_info() _, hist_step_size_s, _ = flume.get_ds_info()
async with feed.index_stream( async with flume.index_stream(
# int(hist_step_size_s) # int(hist_step_size_s)
# TODO: seems this is more reliable at keeping the slow # TODO: seems this is more reliable at keeping the slow
# chart incremented in view more correctly? # chart incremented in view more correctly?
@ -393,7 +394,7 @@ async def graphics_update_loop(
nurse.start_soon(increment_history_view) nurse.start_soon(increment_history_view)
# main real-time quotes update loop # main real-time quotes update loop
stream: tractor.MsgStream = feed.stream stream: tractor.MsgStream = flume.stream
async for quotes in stream: async for quotes in stream:
ds.quotes = quotes ds.quotes = quotes
@ -813,13 +814,13 @@ def graphics_update_cycle(
async def link_views_with_region( async def link_views_with_region(
rt_chart: ChartPlotWidget, rt_chart: ChartPlotWidget,
hist_chart: ChartPlotWidget, hist_chart: ChartPlotWidget,
feed: Feed, flume: Flume,
) -> None: ) -> None:
# these value are be only pulled once during shm init/startup # these value are be only pulled once during shm init/startup
izero_hist = feed.izero_hist izero_hist = flume.izero_hist
izero_rt = feed.izero_rt izero_rt = flume.izero_rt
# Add the LinearRegionItem to the ViewBox, but tell the ViewBox # Add the LinearRegionItem to the ViewBox, but tell the ViewBox
# to exclude this item when doing auto-range calculations. # to exclude this item when doing auto-range calculations.
@ -846,7 +847,7 @@ async def link_views_with_region(
# poll for datums load and timestep detection # poll for datums load and timestep detection
for _ in range(100): for _ in range(100):
try: try:
_, _, ratio = feed.get_ds_info() _, _, ratio = flume.get_ds_info()
break break
except IndexError: except IndexError:
await trio.sleep(0.01) await trio.sleep(0.01)
@ -977,8 +978,7 @@ async def display_symbol_data(
group_key=True group_key=True
) )
first_fqsn = fqsns[0] feed: Feed
async with open_feed( async with open_feed(
fqsns, fqsns,
loglevel=loglevel, loglevel=loglevel,
@ -988,11 +988,17 @@ async def display_symbol_data(
tick_throttle=_quote_throttle_rate, tick_throttle=_quote_throttle_rate,
) as feed: ) as feed:
ohlcv: ShmArray = feed.rt_shm
hist_ohlcv: ShmArray = feed.hist_shm
symbol = feed.symbols[first_fqsn] # TODO: right now we only show one symbol on charts, but
fqsn = symbol.front_fqsn() # overlays are coming muy pronto guey..
assert len(feed.flumes) == 1
flume = list(feed.flumes.values())[0]
ohlcv: ShmArray = flume.rt_shm
hist_ohlcv: ShmArray = flume.hist_shm
symbol = flume.symbol
fqsn = symbol.fqsn
step_size_s = 1 step_size_s = 1
tf_key = tf_in_1s[step_size_s] tf_key = tf_in_1s[step_size_s]
@ -1012,7 +1018,7 @@ async def display_symbol_data(
hist_linked._symbol = symbol hist_linked._symbol = symbol
hist_chart = hist_linked.plot_ohlc_main( hist_chart = hist_linked.plot_ohlc_main(
symbol, symbol,
feed.hist_shm, hist_ohlcv,
# in the case of history chart we explicitly set `False` # in the case of history chart we explicitly set `False`
# to avoid internal pane creation. # to avoid internal pane creation.
# sidepane=False, # sidepane=False,
@ -1100,7 +1106,7 @@ async def display_symbol_data(
graphics_update_loop, graphics_update_loop,
ln, ln,
godwidget, godwidget,
feed, flume,
wap_in_history, wap_in_history,
vlm_chart, vlm_chart,
) )
@ -1124,7 +1130,7 @@ async def display_symbol_data(
await link_views_with_region( await link_views_with_region(
ohlc_chart, ohlc_chart,
hist_chart, hist_chart,
feed, flume,
) )
mode: OrderMode mode: OrderMode

View File

@ -45,7 +45,10 @@ from ..calc import humanize, pnl, puterize
from ..clearing._allocate import Allocator from ..clearing._allocate import Allocator
from ..pp import Position from ..pp import Position
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data.feed import Feed from ..data.feed import (
Feed,
Flume,
)
from ..data.types import Struct from ..data.types import Struct
from ._label import Label from ._label import Label
from ._lines import LevelLine, order_line from ._lines import LevelLine, order_line
@ -64,7 +67,7 @@ _pnl_tasks: dict[str, bool] = {}
async def update_pnl_from_feed( async def update_pnl_from_feed(
feed: Feed, flume: Flume,
order_mode: OrderMode, # noqa order_mode: OrderMode, # noqa
tracker: PositionTracker, tracker: PositionTracker,
@ -95,7 +98,7 @@ async def update_pnl_from_feed(
# real-time update pnl on the status pane # real-time update pnl on the status pane
try: try:
async with feed.stream.subscribe() as bstream: async with flume.stream.subscribe() as bstream:
# last_tick = time.time() # last_tick = time.time()
async for quotes in bstream: async for quotes in bstream:
@ -390,12 +393,12 @@ class SettingsPane:
mode = self.order_mode mode = self.order_mode
sym = mode.chart.linked.symbol sym = mode.chart.linked.symbol
size = tracker.live_pp.size size = tracker.live_pp.size
feed = mode.quote_feed flume: Feed = mode.feed.flumes[sym.fqsn]
pnl_value = 0 pnl_value = 0
if size: if size:
# last historical close price # last historical close price
last = feed.rt_shm.array[-1][['close']][0] last = flume.rt_shm.array[-1][['close']][0]
pnl_value = copysign(1, size) * pnl( pnl_value = copysign(1, size) * pnl(
tracker.live_pp.ppu, tracker.live_pp.ppu,
last, last,
@ -408,7 +411,7 @@ class SettingsPane:
_pnl_tasks[fqsn] = True _pnl_tasks[fqsn] = True
self.order_mode.nursery.start_soon( self.order_mode.nursery.start_soon(
update_pnl_from_feed, update_pnl_from_feed,
feed, flume,
mode, mode,
tracker, tracker,
) )

View File

@ -44,7 +44,10 @@ from ..clearing._allocate import (
) )
from ._style import _font from ._style import _font
from ..data._source import Symbol from ..data._source import Symbol
from ..data.feed import Feed from ..data.feed import (
Feed,
Flume,
)
from ..data.types import Struct from ..data.types import Struct
from ..log import get_logger from ..log import get_logger
from ._editors import LineEditor, ArrowEditor from ._editors import LineEditor, ArrowEditor
@ -118,7 +121,6 @@ class OrderMode:
chart: ChartPlotWidget # type: ignore # noqa chart: ChartPlotWidget # type: ignore # noqa
hist_chart: ChartPlotWidget # type: ignore # noqa hist_chart: ChartPlotWidget # type: ignore # noqa
nursery: trio.Nursery # used by ``ui._position`` code? nursery: trio.Nursery # used by ``ui._position`` code?
quote_feed: Feed
book: OrderBook book: OrderBook
lines: LineEditor lines: LineEditor
arrows: ArrowEditor arrows: ArrowEditor
@ -514,12 +516,13 @@ class OrderMode:
# XXX: seems to fail on certain types of races? # XXX: seems to fail on certain types of races?
# assert len(lines) == 2 # assert len(lines) == 2
if lines: if lines:
_, _, ratio = self.feed.get_ds_info() flume: Flume = self.feed.flumes[self.chart.linked.symbol.fqsn]
_, _, ratio = flume.get_ds_info()
for i, chart in [ for i, chart in [
(arrow_index, self.chart), (arrow_index, self.chart),
(self.feed.izero_hist (flume.izero_hist
+ +
round((arrow_index - self.feed.izero_rt)/ratio), round((arrow_index - flume.izero_rt)/ratio),
self.hist_chart) self.hist_chart)
]: ]:
self.arrows.add( self.arrows.add(
@ -801,7 +804,6 @@ async def open_order_mode(
chart, chart,
hist_chart, hist_chart,
tn, tn,
feed,
book, book,
lines, lines,
arrows, arrows,