diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 82e08507..9369a73b 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -20,7 +20,6 @@ Real-time data feed machinery import time from functools import partial from dataclasses import dataclass, field -from itertools import cycle import socket import json from types import ModuleType @@ -31,7 +30,6 @@ from typing import ( Sequence ) import contextlib -from operator import itemgetter import trio import tractor @@ -182,6 +180,8 @@ async def symbol_data(broker: str, tickers: List[str]): _feeds_cache = {} + +# TODO: use the version of this from .api ? @asynccontextmanager async def get_cached_feed( brokername: str, @@ -326,6 +326,7 @@ class DataFeed: self.quote_gen = None self._symbol_data_cache: Dict[str, Any] = {} + @asynccontextmanager async def open_stream( self, symbols: Sequence[str], @@ -351,40 +352,32 @@ class DataFeed: # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await self.portal.run( - "piker.brokers.data", - 'symbol_data', + symbol_data, broker=self.brokermod.name, tickers=symbols ) self._symbol_data_cache.update(sd) - if test: - # stream from a local test file - quote_gen = await self.portal.run( - "piker.brokers.data", - 'stream_from_file', - filename=test, - ) - else: - log.info(f"Starting new stream for {symbols}") - # start live streaming from broker daemon - quote_gen = await self.portal.run( - "piker.brokers.data", - 'start_quote_stream', - broker=self.brokermod.name, - symbols=symbols, - feed_type=feed_type, - rate=rate, - ) + log.info(f"Starting new stream for {symbols}") - # get first quotes response - log.debug(f"Waiting on first quote for {symbols}...") - quotes = {} - quotes = await quote_gen.__anext__() + # start live streaming from broker daemon + async with self.portal.open_stream_from( + start_quote_stream, + broker=self.brokermod.name, + symbols=symbols, + feed_type=feed_type, + rate=rate, + ) as quote_gen: + + # get first quotes response + log.debug(f"Waiting on first quote for {symbols}...") + quotes = {} + quotes = await quote_gen.__anext__() + + self.quote_gen = quote_gen + self.first_quotes = quotes + yield quote_gen, quotes - self.quote_gen = quote_gen - self.first_quotes = quotes - return quote_gen, quotes except Exception: if self.quote_gen: await self.quote_gen.aclose() @@ -406,8 +399,7 @@ class DataFeed: """Call a broker ``Client`` method using RPC and return result. """ return await self.portal.run( - 'piker.brokers.data', - 'call_client', + call_client, broker=self.brokermod.name, methname=method, **kwargs @@ -425,27 +417,29 @@ async def stream_to_file( """Record client side received quotes to file ``filename``. """ # an async generator instance - agen = await portal.run( - "piker.brokers.data", 'start_quote_stream', - broker=brokermod.name, symbols=tickers) + async with portal.open_stream_from( + start_quote_stream, + broker=brokermod.name, + symbols=tickers + ) as agen: - fname = filename or f'{watchlist_name}.jsonstream' - with open(fname, 'a') as f: - async for quotes in agen: - f.write(json.dumps(quotes)) - f.write('\n--\n') + fname = filename or f'{watchlist_name}.jsonstream' + with open(fname, 'a') as f: + async for quotes in agen: + f.write(json.dumps(quotes)) + f.write('\n--\n') - return fname + return fname -async def stream_from_file( - filename: str, -): - with open(filename, 'r') as quotes_file: - content = quotes_file.read() +# async def stream_from_file( +# filename: str, +# ): +# with open(filename, 'r') as quotes_file: +# content = quotes_file.read() - pkts = content.split('--')[:-1] # simulate 2 separate quote packets - payloads = [json.loads(pkt) for pkt in pkts] - for payload in cycle(payloads): - yield payload - await trio.sleep(0.3) +# pkts = content.split('--')[:-1] # simulate 2 separate quote packets +# payloads = [json.loads(pkt) for pkt in pkts] +# for payload in cycle(payloads): +# yield payload +# await trio.sleep(0.3) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 6138086c..b643b952 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -246,23 +246,24 @@ async def open_ems( async with maybe_open_emsd(broker) as portal: - trades_stream = await portal.run( + async with portal.open_stream_from( + _emsd_main, client_actor_name=actor.name, broker=broker, symbol=symbol.key, - ) - with trio.fail_after(10): - await book._ready_to_receive.wait() + ) as trades_stream: + with trio.fail_after(10): + await book._ready_to_receive.wait() - try: - yield book, trades_stream + try: + yield book, trades_stream - finally: - # TODO: we want to eventually keep this up (by having - # the exec loop keep running in the pikerd tree) but for - # now we have to kill the context to avoid backpressure - # build-up on the shm write loop. - with trio.CancelScope(shield=True): - await trades_stream.aclose() + finally: + # TODO: we want to eventually keep this up (by having + # the exec loop keep running in the pikerd tree) but for + # now we have to kill the context to avoid backpressure + # build-up on the shm write loop. + with trio.CancelScope(shield=True): + await trades_stream.aclose() diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 52fab921..73ca9ee1 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -339,130 +339,131 @@ async def process_broker_trades( """ broker = feed.mod.name - with trio.fail_after(5): - # in the paper engine case this is just a mem receive channel - trades_stream = await feed.recv_trades_data() + # TODO: make this a context + # in the paper engine case this is just a mem receive channel + async with feed.receive_trades_data() as trades_stream: first = await trades_stream.__anext__() - # startup msg expected as first from broker backend - assert first['local_trades'] == 'start' - task_status.started() + # startup msg expected as first from broker backend + assert first['local_trades'] == 'start' + task_status.started() - async for event in trades_stream: + async for event in trades_stream: - name, msg = event['local_trades'] + name, msg = event['local_trades'] - log.info(f'Received broker trade event:\n{pformat(msg)}') + log.info(f'Received broker trade event:\n{pformat(msg)}') - if name == 'position': - msg['resp'] = 'position' - - # relay through - await ctx.send_yield(msg) - continue - - # Get the broker (order) request id, this **must** be normalized - # into messaging provided by the broker backend - reqid = msg['reqid'] - - # make response packet to EMS client(s) - oid = book._broker2ems_ids.get(reqid) - - if oid is None: - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = msg.get('paper_info') - if paper: - oid = paper['oid'] - - else: - msg.get('external') - if not msg: - log.error(f"Unknown trade event {event}") + if name == 'position': + msg['resp'] = 'position' + # relay through + await ctx.send_yield(msg) continue - resp = { - 'resp': None, # placeholder - 'oid': oid - } + # Get the broker (order) request id, this **must** be normalized + # into messaging provided by the broker backend + reqid = msg['reqid'] - if name in ( - 'error', - ): - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? + # make response packet to EMS client(s) + oid = book._broker2ems_ids.get(reqid) - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. + if oid is None: + # paper engine race case: ``Client.submit_limit()`` hasn't + # returned yet and provided an output reqid to register + # locally, so we need to retreive the oid that was already + # packed at submission since we already know it ahead of + # time + paper = msg.get('paper_info') + if paper: + oid = paper['oid'] - message = msg['message'] - - # XXX should we make one when it's blank? - log.error(pformat(message)) - - # TODO: getting this bs, prolly need to handle status messages - # 'Market data farm connection is OK:usfarm.nj' - - # another stupid ib error to handle - # if 10147 in message: cancel - - # don't relay message to order requester client - continue - - elif name in ( - 'status', - ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case - status = msg['status'].lower() - - if status == 'filled': - - # conditional execution is fully complete, no more - # fills for the noted order - if not msg['remaining']: - - resp['resp'] = 'broker_executed' - - log.info(f'Execution for {oid} is complete!') - - # just log it else: - log.info(f'{broker} filled {msg}') + msg.get('external') + if not msg: + log.error(f"Unknown trade event {event}") - else: - # one of (submitted, cancelled) - resp['resp'] = 'broker_' + status + continue - elif name in ( - 'fill', - ): - # proxy through the "fill" result(s) - resp['resp'] = 'broker_filled' - resp.update(msg) + resp = { + 'resp': None, # placeholder + 'oid': oid + } - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + if name in ( + 'error', + ): + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? - # respond to requesting client - await ctx.send_yield(resp) + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. + + message = msg['message'] + + # XXX should we make one when it's blank? + log.error(pformat(message)) + + # TODO: getting this bs, prolly need to handle status messages + # 'Market data farm connection is OK:usfarm.nj' + + # another stupid ib error to handle + # if 10147 in message: cancel + + # don't relay message to order requester client + continue + + elif name in ( + 'status', + ): + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # everyone doin camel case + status = msg['status'].lower() + + if status == 'filled': + + # conditional execution is fully complete, no more + # fills for the noted order + if not msg['remaining']: + + resp['resp'] = 'broker_executed' + + log.info(f'Execution for {oid} is complete!') + + # just log it + else: + log.info(f'{broker} filled {msg}') + + else: + # one of (submitted, cancelled) + resp['resp'] = 'broker_' + status + + elif name in ( + 'fill', + ): + # proxy through the "fill" result(s) + resp['resp'] = 'broker_filled' + resp.update(msg) + + log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + + # respond to requesting client + await ctx.send_yield(resp) async def process_order_cmds( @@ -675,17 +676,17 @@ async def _emsd_main( # acting as an EMS client and will submit orders) to # receive requests pushed over a tractor stream # using (for now) an async generator. - order_stream = await portal.run( + async with portal.open_stream_from( send_order_cmds, symbol_key=symbol, - ) + ) as order_stream: - # start inbound order request processing - await process_order_cmds( - ctx, - order_stream, - symbol, - feed, - client, - dark_book, - ) + # start inbound order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + ) diff --git a/piker/data/feed.py b/piker/data/feed.py index d9ce062e..9455f7ae 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -51,6 +51,7 @@ from ._sampling import ( iter_ohlc_periods, sample_and_broadcast, ) +from .ingest import get_ingestormod log = get_logger(__name__) @@ -302,6 +303,7 @@ class Feed: async def receive(self) -> dict: return await self.stream.__anext__() + @asynccontextmanager async def index_stream( self, delay_s: Optional[int] = None @@ -312,14 +314,16 @@ class Feed: # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes - self._index_stream = await self._brokerd_portal.run( + async with self._brokerd_portal.open_stream_from( iter_ohlc_periods, delay_s=delay_s or self._max_sample_rate, - ) + ) as self._index_stream: + yield self._index_stream + else: + yield self._index_stream - return self._index_stream - - async def recv_trades_data(self) -> AsyncIterator[dict]: + @asynccontextmanager + async def receive_trades_data(self) -> AsyncIterator[dict]: if not getattr(self.mod, 'stream_trades', False): log.warning( @@ -333,7 +337,7 @@ class Feed: # using the ``_.set_fake_trades_stream()`` method if self._trade_stream is None: - self._trade_stream = await self._brokerd_portal.run( + async with self._brokerd_portal.open_stream_from( self.mod.stream_trades, @@ -342,9 +346,10 @@ class Feed: # in messages, though we could probably use # more then one? topics=['local_trades'], - ) - - return self._trade_stream + ) as self._trade_stream: + yield self._trade_stream + else: + yield self._trade_stream def sym_to_shm_key( @@ -373,64 +378,64 @@ async def open_feed( # TODO: do all! sym = symbols[0] - async with maybe_spawn_brokerd( - brokername, - loglevel=loglevel, - ) as portal: + # TODO: compress these to one line with py3.9+ + async with maybe_spawn_brokerd(brokername, loglevel=loglevel) as portal: + + async with portal.open_stream_from( - stream = await portal.run( attach_feed_bus, brokername=brokername, symbol=sym, - loglevel=loglevel, - ) + loglevel=loglevel - # TODO: can we make this work better with the proposed - # context based bidirectional streaming style api proposed in: - # https://github.com/goodboy/tractor/issues/53 - init_msg = await stream.receive() + ) as stream: - # we can only read from shm - shm = attach_shm_array( - token=init_msg[sym]['shm_token'], - readonly=True, - ) + # TODO: can we make this work better with the proposed + # context based bidirectional streaming style api proposed in: + # https://github.com/goodboy/tractor/issues/53 + init_msg = await stream.receive() - feed = Feed( - name=brokername, - stream=stream, - shm=shm, - mod=mod, - _brokerd_portal=portal, - ) - ohlc_sample_rates = [] - - for sym, data in init_msg.items(): - - si = data['symbol_info'] - ohlc_sample_rates.append(data['sample_rate']) - - symbol = Symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), + # we can only read from shm + shm = attach_shm_array( + token=init_msg[sym]['shm_token'], + readonly=True, ) - symbol.broker_info[brokername] = si - feed.symbols[sym] = symbol + feed = Feed( + name=brokername, + stream=stream, + shm=shm, + mod=mod, + _brokerd_portal=portal, + ) + ohlc_sample_rates = [] - # cast shm dtype to list... can't member why we need this - shm_token = data['shm_token'] - shm_token['dtype_descr'] = list(shm_token['dtype_descr']) - assert shm_token == shm.token # sanity + for sym, data in init_msg.items(): - feed._max_sample_rate = max(ohlc_sample_rates) + si = data['symbol_info'] + ohlc_sample_rates.append(data['sample_rate']) - try: - yield feed + symbol = Symbol( + key=sym, + type_key=si.get('asset_type', 'forex'), + tick_size=si.get('price_tick_size', 0.01), + lot_tick_size=si.get('lot_tick_size', 0.0), + ) + symbol.broker_info[brokername] = si - finally: - # always cancel the far end producer task - with trio.CancelScope(shield=True): - await stream.aclose() + feed.symbols[sym] = symbol + + # cast shm dtype to list... can't member why we need this + shm_token = data['shm_token'] + shm_token['dtype_descr'] = list(shm_token['dtype_descr']) + assert shm_token == shm.token # sanity + + feed._max_sample_rate = max(ohlc_sample_rates) + + try: + yield feed + + finally: + # always cancel the far end producer task + with trio.CancelScope(shield=True): + await stream.aclose() diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 0b432e5b..2345b516 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -167,24 +167,25 @@ async def cascade( # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async for msg in await feed.index_stream(): + async with feed.index_stream() as stream: + async for msg in stream: - new_len = len(src.array) + new_len = len(src.array) - if new_len > last_len + 1: - # respawn the signal compute task if the source - # signal has been updated - cs.cancel() - cs = await n.start(fsp_compute) + if new_len > last_len + 1: + # respawn the signal compute task if the source + # signal has been updated + cs.cancel() + cs = await n.start(fsp_compute) - # TODO: adopt an incremental update engine/approach - # where possible here eventually! + # TODO: adopt an incremental update engine/approach + # where possible here eventually! - # read out last shm row - array = dst.array - last = array[-1:].copy() + # read out last shm row + array = dst.array + last = array[-1:].copy() - # write new row to the shm buffer - dst.push(last) + # write new row to the shm buffer + dst.push(last) - last_len = new_len + last_len = new_len diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index f9aefc34..0650663c 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -19,6 +19,7 @@ High level Qt chart widgets. """ from typing import Tuple, Dict, Any, Optional, Callable +from types import ModuleType from functools import partial from PyQt5 import QtCore, QtGui @@ -26,6 +27,7 @@ import numpy as np import pyqtgraph as pg import tractor import trio +from trio_typing import TaskStatus from ._axes import ( DynamicDateAxis, @@ -53,6 +55,7 @@ from ._style import ( _bars_to_left_in_follow_mode, ) from ..data._source import Symbol +from ..data._sharedmem import ShmArray from .. import brokers from .. import data from ..data import maybe_open_shm_array @@ -128,7 +131,8 @@ class ChartSpace(QtGui.QWidget): # self.toolbar_layout.addWidget(self.strategy_box) def load_symbol( self, - symbol: Symbol, + brokername: str, + symbol_key: str, data: np.ndarray, ohlc: bool = True, ) -> None: @@ -136,12 +140,6 @@ class ChartSpace(QtGui.QWidget): Expects a ``numpy`` structured array containing all the ohlcv fields. """ - # XXX: let's see if this causes mem problems - self.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' - f'tick:{symbol.tick_size}' - ) - # TODO: symbol search # # of course this doesn't work :eyeroll: # h = _font.boundingRect('Ag').height() @@ -151,19 +149,18 @@ class ChartSpace(QtGui.QWidget): # self.symbol_label.setText(f'/`{symbol}`') linkedcharts = self._chart_cache.setdefault( - symbol.key, - LinkedSplitCharts(symbol) + symbol_key, + LinkedSplitCharts(self) ) + self.linkedcharts = linkedcharts # remove any existing plots if not self.v_layout.isEmpty(): self.v_layout.removeWidget(linkedcharts) - main_chart = linkedcharts.plot_ohlc_main(symbol, data) - self.v_layout.addWidget(linkedcharts) - return linkedcharts, main_chart + return linkedcharts # TODO: add signalling painter system # def add_signals(self): @@ -187,13 +184,14 @@ class LinkedSplitCharts(QtGui.QWidget): def __init__( self, - symbol: Symbol, + chart_space: ChartSpace, ) -> None: super().__init__() self.signals_visible: bool = False self._cursor: Cursor = None # crosshair graphics self.chart: ChartPlotWidget = None # main (ohlc) chart self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} + self.chart_space = chart_space self.xaxis = DynamicDateAxis( orientation='bottom', @@ -215,7 +213,7 @@ class LinkedSplitCharts(QtGui.QWidget): self.layout.addWidget(self.splitter) # state tracker? - self._symbol: Symbol = symbol + self._symbol: Symbol = None @property def symbol(self) -> Symbol: @@ -939,135 +937,6 @@ async def test_bed( # rlabel.setPos(vb_right - 2*w, d_coords.y()) -async def _async_main( - # implicit required argument provided by ``qtractor_run()`` - widgets: Dict[str, Any], - - sym: str, - brokername: str, - loglevel: str, - -) -> None: - """Main Qt-trio routine invoked by the Qt loop with - the widgets ``dict``. - """ - chart_app = widgets['main'] - - # attempt to configure DPI aware font size - _font.configure_to_dpi(current_screen()) - - # chart_app.init_search() - - # historical data fetch - brokermod = brokers.get_brokermod(brokername) - - async with data.open_feed( - brokername, - [sym], - loglevel=loglevel, - ) as feed: - - ohlcv = feed.shm - bars = ohlcv.array - symbol = feed.symbols[sym] - - # load in symbol's ohlc data - linked_charts, chart = chart_app.load_symbol(symbol, bars) - - # plot historical vwap if available - wap_in_history = False - - if brokermod._show_wap_in_history: - - if 'bar_wap' in bars.dtype.fields: - wap_in_history = True - chart.draw_curve( - name='bar_wap', - data=bars, - add_label=False, - ) - - # size view to data once at outset - chart._set_yrange() - - # TODO: a data view api that makes this less shit - chart._shm = ohlcv - - # TODO: eventually we'll support some kind of n-compose syntax - fsp_conf = { - 'rsi': { - 'period': 14, - 'chart_kwargs': { - 'static_yrange': (0, 100), - }, - }, - - } - - # make sure that the instrument supports volume history - # (sometimes this is not the case for some commodities and - # derivatives) - volm = ohlcv.array['volume'] - if ( - np.all(np.isin(volm, -1)) or - np.all(np.isnan(volm)) - ): - log.warning( - f"{sym} does not seem to have volume info," - " dropping volume signals") - else: - fsp_conf.update({ - 'vwap': { - 'overlay': True, - 'anchor': 'session', - }, - }) - - async with trio.open_nursery() as n: - - # load initial fsp chain (otherwise known as "indicators") - n.start_soon( - spawn_fsps, - linked_charts, - fsp_conf, - sym, - ohlcv, - brokermod, - loglevel, - ) - - # start graphics update loop(s)after receiving first live quote - n.start_soon( - chart_from_quotes, - chart, - feed.stream, - ohlcv, - wap_in_history, - ) - - # wait for a first quote before we start any update tasks - quote = await feed.receive() - - log.info(f'Received first quote {quote}') - - n.start_soon( - check_for_new_bars, - feed, - # delay, - ohlcv, - linked_charts - ) - - # interactive testing - # n.start_soon( - # test_bed, - # ohlcv, - # chart, - # linked_charts, - # ) - await start_order_mode(chart, symbol, brokername) - - async def chart_from_quotes( chart: ChartPlotWidget, stream, @@ -1245,7 +1114,7 @@ async def spawn_fsps( """ # spawns sub-processes which execute cpu bound FSP code - async with tractor.open_nursery() as n: + async with tractor.open_nursery(loglevel=loglevel) as n: # spawns local task that consume and chart data streams from # sub-procs @@ -1280,66 +1149,36 @@ async def spawn_fsps( conf['shm'] = shm - # spawn closure, can probably define elsewhere - async def spawn_fsp_daemon( - fsp_name: str, - display_name: str, - conf: dict, - ): - """Start an fsp subactor async. + portal = await n.start_actor( + enable_modules=['piker.fsp'], + name=display_name, + ) - """ - # print(f'FSP NAME: {fsp_name}') - portal = await n.run_in_actor( - - # subactor entrypoint - fsp.cascade, - - # name as title of sub-chart - name=display_name, - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=conf['shm'].token, - symbol=sym, - fsp_func_name=fsp_name, - - # tractor config - loglevel=loglevel, - ) - - stream = await portal.result() - - # receive last index for processed historical - # data-array as first msg - _ = await stream.receive() - - conf['stream'] = stream - conf['portal'] = portal - - # new local task + # init async ln.start_soon( - spawn_fsp_daemon, + run_fsp, + portal, + linked_charts, + brokermod, + sym, + src_shm, fsp_func_name, display_name, conf, ) - # blocks here until all daemons up - - # start and block on update loops - async with trio.open_nursery() as ln: - for fsp_func_name, conf in fsps.items(): - ln.start_soon( - update_signals, - linked_charts, - fsp_func_name, - conf, - ) + # blocks here until all fsp actors complete -async def update_signals( +async def run_fsp( + + portal: tractor._portal.Portal, linked_charts: LinkedSplitCharts, + brokermod: ModuleType, + sym: str, + src_shm: ShmArray, fsp_func_name: str, + display_name: str, conf: Dict[str, Any], ) -> None: @@ -1348,96 +1187,117 @@ async def update_signals( This is called once for each entry in the fsp config map. """ - shm = conf['shm'] + async with portal.open_stream_from( - if conf.get('overlay'): - chart = linked_charts.chart - chart.draw_curve( - name='vwap', - data=shm.array, - overlay=True, - ) - last_val_sticky = None + # subactor entrypoint + fsp.cascade, - else: + # name as title of sub-chart + brokername=brokermod.name, + src_shm_token=src_shm.token, + dst_shm_token=conf['shm'].token, + symbol=sym, + fsp_func_name=fsp_func_name, - chart = linked_charts.add_plot( - name=fsp_func_name, - array=shm.array, + ) as stream: - # curve by default - ohlc=False, + # receive last index for processed historical + # data-array as first msg + _ = await stream.receive() - # settings passed down to ``ChartPlotWidget`` - **conf.get('chart_kwargs', {}) - # static_yrange=(0, 100), - ) + conf['stream'] = stream + conf['portal'] = portal - # display contents labels asap - chart.update_contents_labels( - len(shm.array) - 1, - # fsp_func_name - ) + shm = conf['shm'] - # read last value - array = shm.array - value = array[fsp_func_name][-1] + if conf.get('overlay'): + chart = linked_charts.chart + chart.draw_curve( + name='vwap', + data=shm.array, + overlay=True, + ) + last_val_sticky = None - last_val_sticky = chart._ysticks[chart.name] - last_val_sticky.update_from_data(-1, value) + else: - chart.update_curve_from_array(fsp_func_name, array) + chart = linked_charts.add_plot( + name=fsp_func_name, + array=shm.array, - chart._shm = shm + # curve by default + ohlc=False, - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) + # settings passed down to ``ChartPlotWidget`` + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), + ) - if fsp_func_name == 'rsi': - # add moveable over-[sold/bought] lines - # and labels only for the 70/30 lines - level_line(chart, 20) - level_line(chart, 30, orient_v='top') - level_line(chart, 70, orient_v='bottom') - level_line(chart, 80, orient_v='top') + # display contents labels asap + chart.update_contents_labels( + len(shm.array) - 1, + # fsp_func_name + ) - chart._set_yrange() + # read last value + array = shm.array + value = array[fsp_func_name][-1] - stream = conf['stream'] - - # update chart graphics - async for value in stream: - - # TODO: provide a read sync mechanism to avoid this polling. - # the underlying issue is that a backfill and subsequent shm - # array first/last index update could result in an empty array - # read here since the stream is never torn down on the - # re-compute steps. - read_tries = 2 - while read_tries > 0: - - try: - # read last - array = shm.array - value = array[-1][fsp_func_name] - break - - except IndexError: - read_tries -= 1 - continue - - if last_val_sticky: + last_val_sticky = chart._ysticks[chart.name] last_val_sticky.update_from_data(-1, value) - # update graphics - chart.update_curve_from_array(fsp_func_name, array) + chart.update_curve_from_array(fsp_func_name, array) + + chart._shm = shm + + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) + + if fsp_func_name == 'rsi': + # add moveable over-[sold/bought] lines + # and labels only for the 70/30 lines + level_line(chart, 20) + level_line(chart, 30, orient_v='top') + level_line(chart, 70, orient_v='bottom') + level_line(chart, 80, orient_v='top') + + chart._set_yrange() + + stream = conf['stream'] + + # update chart graphics + async for value in stream: + + # TODO: provide a read sync mechanism to avoid this polling. + # the underlying issue is that a backfill and subsequent shm + # array first/last index update could result in an empty array + # read here since the stream is never torn down on the + # re-compute steps. + read_tries = 2 + while read_tries > 0: + + try: + # read last + array = shm.array + value = array[-1][fsp_func_name] + break + + except IndexError: + read_tries -= 1 + continue + + if last_val_sticky: + last_val_sticky.update_from_data(-1, value) + + # update graphics + chart.update_curve_from_array(fsp_func_name, array) async def check_for_new_bars(feed, ohlcv, linked_charts): @@ -1453,45 +1313,226 @@ async def check_for_new_bars(feed, ohlcv, linked_charts): price_chart = linked_charts.chart price_chart.default_view() - async for index in await feed.index_stream(): + async with feed.index_stream() as stream: + async for index in stream: - # update chart historical bars graphics by incrementing - # a time step and drawing the history and new bar + # update chart historical bars graphics by incrementing + # a time step and drawing the history and new bar - # When appending a new bar, in the time between the insert - # from the writing process and the Qt render call, here, - # the index of the shm buffer may be incremented and the - # (render) call here might read the new flat bar appended - # to the buffer (since -1 index read). In that case H==L and the - # body will be set as None (not drawn) on what this render call - # *thinks* is the curent bar (even though it's reading data from - # the newly inserted flat bar. - # - # HACK: We need to therefore write only the history (not the - # current bar) and then either write the current bar manually - # or place a cursor for visual cue of the current time step. + # When appending a new bar, in the time between the insert + # from the writing process and the Qt render call, here, + # the index of the shm buffer may be incremented and the + # (render) call here might read the new flat bar appended + # to the buffer (since -1 index read). In that case H==L and the + # body will be set as None (not drawn) on what this render call + # *thinks* is the curent bar (even though it's reading data from + # the newly inserted flat bar. + # + # HACK: We need to therefore write only the history (not the + # current bar) and then either write the current bar manually + # or place a cursor for visual cue of the current time step. - # XXX: this puts a flat bar on the current time step - # TODO: if we eventually have an x-axis time-step "cursor" - # we can get rid of this since it is extra overhead. - price_chart.update_ohlc_from_array( - price_chart.name, - ohlcv.array, - just_history=False, - ) - - for name in price_chart._overlays: - - price_chart.update_curve_from_array( - name, - price_chart._arrays[name] + # XXX: this puts a flat bar on the current time step + # TODO: if we eventually have an x-axis time-step "cursor" + # we can get rid of this since it is extra overhead. + price_chart.update_ohlc_from_array( + price_chart.name, + ohlcv.array, + just_history=False, ) - for name, chart in linked_charts.subplots.items(): - chart.update_curve_from_array(chart.name, chart._shm.array) + for name in price_chart._overlays: - # shift the view if in follow mode - price_chart.increment_view() + price_chart.update_curve_from_array( + name, + price_chart._arrays[name] + ) + + for name, chart in linked_charts.subplots.items(): + chart.update_curve_from_array(chart.name, chart._shm.array) + + # shift the view if in follow mode + price_chart.increment_view() + + +async def chart_symbol( + chart_app: ChartSpace, + brokername: str, + sym: str, + loglevel: str, + task_status: TaskStatus[Symbol] = trio.TASK_STATUS_IGNORED, +) -> None: + """Spawn a real-time chart widget for this symbol and app session. + + These widgets can remain up but hidden so that multiple symbols + can be viewed and switched between extremely fast. + + """ + # historical data fetch + brokermod = brokers.get_brokermod(brokername) + + async with data.open_feed( + brokername, + [sym], + loglevel=loglevel, + ) as feed: + + ohlcv: ShmArray = feed.shm + bars = ohlcv.array + symbol = feed.symbols[sym] + + task_status.started(symbol) + + # load in symbol's ohlc data + chart_app.window.setWindowTitle( + f'{symbol.key}@{symbol.brokers} ' + f'tick:{symbol.tick_size}' + ) + + # await tractor.breakpoint() + linked_charts = chart_app.linkedcharts + linked_charts._symbol = symbol + chart = linked_charts.plot_ohlc_main(symbol, bars) + + chart.setFocus() + + # plot historical vwap if available + wap_in_history = False + + if brokermod._show_wap_in_history: + + if 'bar_wap' in bars.dtype.fields: + wap_in_history = True + chart.draw_curve( + name='bar_wap', + data=bars, + add_label=False, + ) + + # size view to data once at outset + chart._set_yrange() + + # TODO: a data view api that makes this less shit + chart._shm = ohlcv + + # TODO: eventually we'll support some kind of n-compose syntax + fsp_conf = { + 'rsi': { + 'period': 14, + 'chart_kwargs': { + 'static_yrange': (0, 100), + }, + }, + + } + + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + volm = ohlcv.array['volume'] + if ( + np.all(np.isin(volm, -1)) or + np.all(np.isnan(volm)) + ): + log.warning( + f"{sym} does not seem to have volume info," + " dropping volume signals") + else: + fsp_conf.update({ + 'vwap': { + 'overlay': True, + 'anchor': 'session', + }, + }) + + async with trio.open_nursery() as n: + + # load initial fsp chain (otherwise known as "indicators") + n.start_soon( + spawn_fsps, + linked_charts, + fsp_conf, + sym, + ohlcv, + brokermod, + loglevel, + ) + + # start graphics update loop(s)after receiving first live quote + n.start_soon( + chart_from_quotes, + chart, + feed.stream, + ohlcv, + wap_in_history, + ) + + # wait for a first quote before we start any update tasks + quote = await feed.receive() + + log.info(f'Received first quote {quote}') + + n.start_soon( + check_for_new_bars, + feed, + # delay, + ohlcv, + linked_charts + ) + + # interactive testing + # n.start_soon( + # test_bed, + # ohlcv, + # chart, + # linked_charts, + # ) + + await start_order_mode(chart, symbol, brokername) + + +async def _async_main( + # implicit required argument provided by ``qtractor_run()`` + widgets: Dict[str, Any], + + symbol_key: str, + brokername: str, + loglevel: str, + +) -> None: + """ + Main Qt-trio routine invoked by the Qt loop with the widgets ``dict``. + + Provision the "main" widget with initial symbol data and root nursery. + + """ + chart_app = widgets['main'] + + # attempt to configure DPI aware font size + _font.configure_to_dpi(current_screen()) + + async with trio.open_nursery() as root_n: + + # set root nursery for spawning other charts/feeds + # that run cached in the bg + chart_app._root_n = root_n + + chart_app.load_symbol(brokername, symbol_key, loglevel) + + symbol = await root_n.start( + chart_symbol, + chart_app, + brokername, + symbol_key, + loglevel, + ) + + chart_app.window.setWindowTitle( + f'{symbol.key}@{symbol.brokers} ' + f'tick:{symbol.tick_size}' + ) + + await trio.sleep_forever() def _main( diff --git a/piker/ui/kivy/monitor.py b/piker/ui/kivy/monitor.py index 6c0a7736..6bb45bfd 100644 --- a/piker/ui/kivy/monitor.py +++ b/piker/ui/kivy/monitor.py @@ -179,121 +179,121 @@ async def _async_main( This is started with cli cmd `piker monitor`. ''' feed = DataFeed(portal, brokermod) - quote_gen, first_quotes = await feed.open_stream( + async with feed.open_stream( symbols, 'stock', rate=rate, test=test, - ) - first_quotes_list = list(first_quotes.copy().values()) - quotes = list(first_quotes.copy().values()) + ) as (quote_gen, first_quotes): + first_quotes_list = list(first_quotes.copy().values()) + quotes = list(first_quotes.copy().values()) - # build out UI - Window.set_title(f"monitor: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', spacing=0) + # build out UI + Window.set_title(f"monitor: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', spacing=0) - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._stock_bidasks + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._stock_bidasks - # add header row - headers = list(first_quotes_list[0].keys()) - headers.remove('displayable') + # add header row + headers = list(first_quotes_list[0].keys()) + headers.remove('displayable') - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header=True, - size_hint=(1, None), - ) - box.add_widget(header) - - # build table - table = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes_list: - symbol = ticker_record['symbol'] - table.append_row( - symbol, - Row( - ticker_record, - headers=('symbol',), - bidasks=bidasks, - no_cell=('displayable',), - table=table - ) + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header=True, + size_hint=(1, None), ) - table.last_clicked_row = next(iter(table.symbols2rows.values())) + box.add_widget(header) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = table - - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(table.sort_key) - sort_cell.bold = sort_cell.underline = True - table.last_clicked_col_cell = sort_cell - - # set up a pager view for large ticker lists - table.bind(minimum_height=table.setter('height')) - - async def spawn_opts_chain(): - """Spawn an options chain UI in a new subactor. - """ - from .option_chain import _async_main - - try: - async with tractor.open_nursery() as tn: - portal = await tn.run_in_actor( - 'optschain', - _async_main, - symbol=table.last_clicked_row._last_record['symbol'], - brokername=brokermod.name, - loglevel=tractor.log.get_loglevel(), + # build table + table = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes_list: + symbol = ticker_record['symbol'] + table.append_row( + symbol, + Row( + ticker_record, + headers=('symbol',), + bidasks=bidasks, + no_cell=('displayable',), + table=table ) - except tractor.RemoteActorError: - # don't allow option chain errors to crash this monitor - # this is, like, the most basic of resliency policies - log.exception(f"{portal.actor.name} crashed:") + ) + table.last_clicked_row = next(iter(table.symbols2rows.values())) - async with trio.open_nursery() as nursery: - pager = PagerView( - container=box, - contained=table, - nursery=nursery, - # spawn an option chain on 'o' keybinding - kbctls={('o',): spawn_opts_chain}, - ) - box.add_widget(pager) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = table - widgets = { - 'root': box, - 'table': table, - 'box': box, - 'header': header, - 'pager': pager, - } + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(table.sort_key) + sort_cell.bold = sort_cell.underline = True + table.last_clicked_col_cell = sort_cell - global _widgets - _widgets = widgets + # set up a pager view for large ticker lists + table.bind(minimum_height=table.setter('height')) - nursery.start_soon( - update_quotes, - nursery, - brokermod.format_stock_quote, - widgets, - quote_gen, - feed._symbol_data_cache, - quotes - ) - try: - await async_runTouchApp(widgets['root']) - finally: - # cancel remote data feed task - await quote_gen.aclose() - # cancel GUI update task - nursery.cancel_scope.cancel() + async def spawn_opts_chain(): + """Spawn an options chain UI in a new subactor. + """ + from .option_chain import _async_main + + try: + async with tractor.open_nursery() as tn: + portal = await tn.run_in_actor( + 'optschain', + _async_main, + symbol=table.last_clicked_row._last_record['symbol'], + brokername=brokermod.name, + loglevel=tractor.log.get_loglevel(), + ) + except tractor.RemoteActorError: + # don't allow option chain errors to crash this monitor + # this is, like, the most basic of resliency policies + log.exception(f"{portal.actor.name} crashed:") + + async with trio.open_nursery() as nursery: + pager = PagerView( + container=box, + contained=table, + nursery=nursery, + # spawn an option chain on 'o' keybinding + kbctls={('o',): spawn_opts_chain}, + ) + box.add_widget(pager) + + widgets = { + 'root': box, + 'table': table, + 'box': box, + 'header': header, + 'pager': pager, + } + + global _widgets + _widgets = widgets + + nursery.start_soon( + update_quotes, + nursery, + brokermod.format_stock_quote, + widgets, + quote_gen, + feed._symbol_data_cache, + quotes + ) + try: + await async_runTouchApp(widgets['root']) + finally: + # cancel remote data feed task + await quote_gen.aclose() + # cancel GUI update task + nursery.cancel_scope.cancel()