Merge pull request #169 from pikers/tractor_open_stream_from

Port to new tractor stream api
binance_backend
goodboy 2021-05-04 10:41:49 -04:00 committed by GitHub
commit 9de02321d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 699 additions and 656 deletions

View File

@ -20,7 +20,6 @@ Real-time data feed machinery
import time import time
from functools import partial from functools import partial
from dataclasses import dataclass, field from dataclasses import dataclass, field
from itertools import cycle
import socket import socket
import json import json
from types import ModuleType from types import ModuleType
@ -31,7 +30,6 @@ from typing import (
Sequence Sequence
) )
import contextlib import contextlib
from operator import itemgetter
import trio import trio
import tractor import tractor
@ -182,6 +180,8 @@ async def symbol_data(broker: str, tickers: List[str]):
_feeds_cache = {} _feeds_cache = {}
# TODO: use the version of this from .api ?
@asynccontextmanager @asynccontextmanager
async def get_cached_feed( async def get_cached_feed(
brokername: str, brokername: str,
@ -326,6 +326,7 @@ class DataFeed:
self.quote_gen = None self.quote_gen = None
self._symbol_data_cache: Dict[str, Any] = {} self._symbol_data_cache: Dict[str, Any] = {}
@asynccontextmanager
async def open_stream( async def open_stream(
self, self,
symbols: Sequence[str], symbols: Sequence[str],
@ -351,40 +352,32 @@ class DataFeed:
# subscribe for tickers (this performs a possible filtering # subscribe for tickers (this performs a possible filtering
# where invalid symbols are discarded) # where invalid symbols are discarded)
sd = await self.portal.run( sd = await self.portal.run(
"piker.brokers.data", symbol_data,
'symbol_data',
broker=self.brokermod.name, broker=self.brokermod.name,
tickers=symbols tickers=symbols
) )
self._symbol_data_cache.update(sd) self._symbol_data_cache.update(sd)
if test: log.info(f"Starting new stream for {symbols}")
# stream from a local test file
quote_gen = await self.portal.run(
"piker.brokers.data",
'stream_from_file',
filename=test,
)
else:
log.info(f"Starting new stream for {symbols}")
# start live streaming from broker daemon
quote_gen = await self.portal.run(
"piker.brokers.data",
'start_quote_stream',
broker=self.brokermod.name,
symbols=symbols,
feed_type=feed_type,
rate=rate,
)
# get first quotes response # start live streaming from broker daemon
log.debug(f"Waiting on first quote for {symbols}...") async with self.portal.open_stream_from(
quotes = {} start_quote_stream,
quotes = await quote_gen.__anext__() broker=self.brokermod.name,
symbols=symbols,
feed_type=feed_type,
rate=rate,
) as quote_gen:
# get first quotes response
log.debug(f"Waiting on first quote for {symbols}...")
quotes = {}
quotes = await quote_gen.__anext__()
self.quote_gen = quote_gen
self.first_quotes = quotes
yield quote_gen, quotes
self.quote_gen = quote_gen
self.first_quotes = quotes
return quote_gen, quotes
except Exception: except Exception:
if self.quote_gen: if self.quote_gen:
await self.quote_gen.aclose() await self.quote_gen.aclose()
@ -406,8 +399,7 @@ class DataFeed:
"""Call a broker ``Client`` method using RPC and return result. """Call a broker ``Client`` method using RPC and return result.
""" """
return await self.portal.run( return await self.portal.run(
'piker.brokers.data', call_client,
'call_client',
broker=self.brokermod.name, broker=self.brokermod.name,
methname=method, methname=method,
**kwargs **kwargs
@ -425,27 +417,29 @@ async def stream_to_file(
"""Record client side received quotes to file ``filename``. """Record client side received quotes to file ``filename``.
""" """
# an async generator instance # an async generator instance
agen = await portal.run( async with portal.open_stream_from(
"piker.brokers.data", 'start_quote_stream', start_quote_stream,
broker=brokermod.name, symbols=tickers) broker=brokermod.name,
symbols=tickers
) as agen:
fname = filename or f'{watchlist_name}.jsonstream' fname = filename or f'{watchlist_name}.jsonstream'
with open(fname, 'a') as f: with open(fname, 'a') as f:
async for quotes in agen: async for quotes in agen:
f.write(json.dumps(quotes)) f.write(json.dumps(quotes))
f.write('\n--\n') f.write('\n--\n')
return fname return fname
async def stream_from_file( # async def stream_from_file(
filename: str, # filename: str,
): # ):
with open(filename, 'r') as quotes_file: # with open(filename, 'r') as quotes_file:
content = quotes_file.read() # content = quotes_file.read()
pkts = content.split('--')[:-1] # simulate 2 separate quote packets # pkts = content.split('--')[:-1] # simulate 2 separate quote packets
payloads = [json.loads(pkt) for pkt in pkts] # payloads = [json.loads(pkt) for pkt in pkts]
for payload in cycle(payloads): # for payload in cycle(payloads):
yield payload # yield payload
await trio.sleep(0.3) # await trio.sleep(0.3)

View File

@ -246,23 +246,24 @@ async def open_ems(
async with maybe_open_emsd(broker) as portal: async with maybe_open_emsd(broker) as portal:
trades_stream = await portal.run( async with portal.open_stream_from(
_emsd_main, _emsd_main,
client_actor_name=actor.name, client_actor_name=actor.name,
broker=broker, broker=broker,
symbol=symbol.key, symbol=symbol.key,
) ) as trades_stream:
with trio.fail_after(10): with trio.fail_after(10):
await book._ready_to_receive.wait() await book._ready_to_receive.wait()
try: try:
yield book, trades_stream yield book, trades_stream
finally: finally:
# TODO: we want to eventually keep this up (by having # TODO: we want to eventually keep this up (by having
# the exec loop keep running in the pikerd tree) but for # the exec loop keep running in the pikerd tree) but for
# now we have to kill the context to avoid backpressure # now we have to kill the context to avoid backpressure
# build-up on the shm write loop. # build-up on the shm write loop.
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await trades_stream.aclose() await trades_stream.aclose()

View File

@ -339,130 +339,131 @@ async def process_broker_trades(
""" """
broker = feed.mod.name broker = feed.mod.name
with trio.fail_after(5): # TODO: make this a context
# in the paper engine case this is just a mem receive channel # in the paper engine case this is just a mem receive channel
trades_stream = await feed.recv_trades_data() async with feed.receive_trades_data() as trades_stream:
first = await trades_stream.__anext__() first = await trades_stream.__anext__()
# startup msg expected as first from broker backend # startup msg expected as first from broker backend
assert first['local_trades'] == 'start' assert first['local_trades'] == 'start'
task_status.started() task_status.started()
async for event in trades_stream: async for event in trades_stream:
name, msg = event['local_trades'] name, msg = event['local_trades']
log.info(f'Received broker trade event:\n{pformat(msg)}') log.info(f'Received broker trade event:\n{pformat(msg)}')
if name == 'position': if name == 'position':
msg['resp'] = 'position' msg['resp'] = 'position'
# relay through
await ctx.send_yield(msg)
continue
# Get the broker (order) request id, this **must** be normalized
# into messaging provided by the broker backend
reqid = msg['reqid']
# make response packet to EMS client(s)
oid = book._broker2ems_ids.get(reqid)
if oid is None:
# paper engine race case: ``Client.submit_limit()`` hasn't
# returned yet and provided an output reqid to register
# locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = msg.get('paper_info')
if paper:
oid = paper['oid']
else:
msg.get('external')
if not msg:
log.error(f"Unknown trade event {event}")
# relay through
await ctx.send_yield(msg)
continue continue
resp = { # Get the broker (order) request id, this **must** be normalized
'resp': None, # placeholder # into messaging provided by the broker backend
'oid': oid reqid = msg['reqid']
}
if name in ( # make response packet to EMS client(s)
'error', oid = book._broker2ems_ids.get(reqid)
):
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# This looks like a supervision policy for pending orders on if oid is None:
# some unexpected failure - something we need to think more # paper engine race case: ``Client.submit_limit()`` hasn't
# about. In most default situations, with composed orders # returned yet and provided an output reqid to register
# (ex. brackets), most brokers seem to use a oca policy. # locally, so we need to retreive the oid that was already
# packed at submission since we already know it ahead of
# time
paper = msg.get('paper_info')
if paper:
oid = paper['oid']
message = msg['message']
# XXX should we make one when it's blank?
log.error(pformat(message))
# TODO: getting this bs, prolly need to handle status messages
# 'Market data farm connection is OK:usfarm.nj'
# another stupid ib error to handle
# if 10147 in message: cancel
# don't relay message to order requester client
continue
elif name in (
'status',
):
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
status = msg['status'].lower()
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not msg['remaining']:
resp['resp'] = 'broker_executed'
log.info(f'Execution for {oid} is complete!')
# just log it
else: else:
log.info(f'{broker} filled {msg}') msg.get('external')
if not msg:
log.error(f"Unknown trade event {event}")
else: continue
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
elif name in ( resp = {
'fill', 'resp': None, # placeholder
): 'oid': oid
# proxy through the "fill" result(s) }
resp['resp'] = 'broker_filled'
resp.update(msg)
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') if name in (
'error',
):
# TODO: figure out how this will interact with EMS clients
# for ex. on an error do we react with a dark orders
# management response, like cancelling all dark orders?
# respond to requesting client # This looks like a supervision policy for pending orders on
await ctx.send_yield(resp) # some unexpected failure - something we need to think more
# about. In most default situations, with composed orders
# (ex. brackets), most brokers seem to use a oca policy.
message = msg['message']
# XXX should we make one when it's blank?
log.error(pformat(message))
# TODO: getting this bs, prolly need to handle status messages
# 'Market data farm connection is OK:usfarm.nj'
# another stupid ib error to handle
# if 10147 in message: cancel
# don't relay message to order requester client
continue
elif name in (
'status',
):
# TODO: templating the ib statuses in comparison with other
# brokers is likely the way to go:
# https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313
# short list:
# - PendingSubmit
# - PendingCancel
# - PreSubmitted (simulated orders)
# - ApiCancelled (cancelled by client before submission
# to routing)
# - Cancelled
# - Filled
# - Inactive (reject or cancelled but not by trader)
# everyone doin camel case
status = msg['status'].lower()
if status == 'filled':
# conditional execution is fully complete, no more
# fills for the noted order
if not msg['remaining']:
resp['resp'] = 'broker_executed'
log.info(f'Execution for {oid} is complete!')
# just log it
else:
log.info(f'{broker} filled {msg}')
else:
# one of (submitted, cancelled)
resp['resp'] = 'broker_' + status
elif name in (
'fill',
):
# proxy through the "fill" result(s)
resp['resp'] = 'broker_filled'
resp.update(msg)
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
# respond to requesting client
await ctx.send_yield(resp)
async def process_order_cmds( async def process_order_cmds(
@ -675,17 +676,17 @@ async def _emsd_main(
# acting as an EMS client and will submit orders) to # acting as an EMS client and will submit orders) to
# receive requests pushed over a tractor stream # receive requests pushed over a tractor stream
# using (for now) an async generator. # using (for now) an async generator.
order_stream = await portal.run( async with portal.open_stream_from(
send_order_cmds, send_order_cmds,
symbol_key=symbol, symbol_key=symbol,
) ) as order_stream:
# start inbound order request processing # start inbound order request processing
await process_order_cmds( await process_order_cmds(
ctx, ctx,
order_stream, order_stream,
symbol, symbol,
feed, feed,
client, client,
dark_book, dark_book,
) )

View File

@ -51,6 +51,7 @@ from ._sampling import (
iter_ohlc_periods, iter_ohlc_periods,
sample_and_broadcast, sample_and_broadcast,
) )
from .ingest import get_ingestormod
log = get_logger(__name__) log = get_logger(__name__)
@ -302,6 +303,7 @@ class Feed:
async def receive(self) -> dict: async def receive(self) -> dict:
return await self.stream.__anext__() return await self.stream.__anext__()
@asynccontextmanager
async def index_stream( async def index_stream(
self, self,
delay_s: Optional[int] = None delay_s: Optional[int] = None
@ -312,14 +314,16 @@ class Feed:
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
self._index_stream = await self._brokerd_portal.run( async with self._brokerd_portal.open_stream_from(
iter_ohlc_periods, iter_ohlc_periods,
delay_s=delay_s or self._max_sample_rate, delay_s=delay_s or self._max_sample_rate,
) ) as self._index_stream:
yield self._index_stream
else:
yield self._index_stream
return self._index_stream @asynccontextmanager
async def receive_trades_data(self) -> AsyncIterator[dict]:
async def recv_trades_data(self) -> AsyncIterator[dict]:
if not getattr(self.mod, 'stream_trades', False): if not getattr(self.mod, 'stream_trades', False):
log.warning( log.warning(
@ -333,7 +337,7 @@ class Feed:
# using the ``_.set_fake_trades_stream()`` method # using the ``_.set_fake_trades_stream()`` method
if self._trade_stream is None: if self._trade_stream is None:
self._trade_stream = await self._brokerd_portal.run( async with self._brokerd_portal.open_stream_from(
self.mod.stream_trades, self.mod.stream_trades,
@ -342,9 +346,10 @@ class Feed:
# in messages, though we could probably use # in messages, though we could probably use
# more then one? # more then one?
topics=['local_trades'], topics=['local_trades'],
) ) as self._trade_stream:
yield self._trade_stream
return self._trade_stream else:
yield self._trade_stream
def sym_to_shm_key( def sym_to_shm_key(
@ -373,64 +378,64 @@ async def open_feed(
# TODO: do all! # TODO: do all!
sym = symbols[0] sym = symbols[0]
async with maybe_spawn_brokerd( # TODO: compress these to one line with py3.9+
brokername, async with maybe_spawn_brokerd(brokername, loglevel=loglevel) as portal:
loglevel=loglevel,
) as portal: async with portal.open_stream_from(
stream = await portal.run(
attach_feed_bus, attach_feed_bus,
brokername=brokername, brokername=brokername,
symbol=sym, symbol=sym,
loglevel=loglevel, loglevel=loglevel
)
# TODO: can we make this work better with the proposed ) as stream:
# context based bidirectional streaming style api proposed in:
# https://github.com/goodboy/tractor/issues/53
init_msg = await stream.receive()
# we can only read from shm # TODO: can we make this work better with the proposed
shm = attach_shm_array( # context based bidirectional streaming style api proposed in:
token=init_msg[sym]['shm_token'], # https://github.com/goodboy/tractor/issues/53
readonly=True, init_msg = await stream.receive()
)
feed = Feed( # we can only read from shm
name=brokername, shm = attach_shm_array(
stream=stream, token=init_msg[sym]['shm_token'],
shm=shm, readonly=True,
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
for sym, data in init_msg.items():
si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
symbol = Symbol(
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
) )
symbol.broker_info[brokername] = si
feed.symbols[sym] = symbol feed = Feed(
name=brokername,
stream=stream,
shm=shm,
mod=mod,
_brokerd_portal=portal,
)
ohlc_sample_rates = []
# cast shm dtype to list... can't member why we need this for sym, data in init_msg.items():
shm_token = data['shm_token']
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates) si = data['symbol_info']
ohlc_sample_rates.append(data['sample_rate'])
try: symbol = Symbol(
yield feed key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
)
symbol.broker_info[brokername] = si
finally: feed.symbols[sym] = symbol
# always cancel the far end producer task
with trio.CancelScope(shield=True): # cast shm dtype to list... can't member why we need this
await stream.aclose() shm_token = data['shm_token']
shm_token['dtype_descr'] = list(shm_token['dtype_descr'])
assert shm_token == shm.token # sanity
feed._max_sample_rate = max(ohlc_sample_rates)
try:
yield feed
finally:
# always cancel the far end producer task
with trio.CancelScope(shield=True):
await stream.aclose()

View File

@ -167,24 +167,25 @@ async def cascade(
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async for msg in await feed.index_stream(): async with feed.index_stream() as stream:
async for msg in stream:
new_len = len(src.array) new_len = len(src.array)
if new_len > last_len + 1: if new_len > last_len + 1:
# respawn the signal compute task if the source # respawn the signal compute task if the source
# signal has been updated # signal has been updated
cs.cancel() cs.cancel()
cs = await n.start(fsp_compute) cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach # TODO: adopt an incremental update engine/approach
# where possible here eventually! # where possible here eventually!
# read out last shm row # read out last shm row
array = dst.array array = dst.array
last = array[-1:].copy() last = array[-1:].copy()
# write new row to the shm buffer # write new row to the shm buffer
dst.push(last) dst.push(last)
last_len = new_len last_len = new_len

View File

@ -19,6 +19,7 @@ High level Qt chart widgets.
""" """
from typing import Tuple, Dict, Any, Optional, Callable from typing import Tuple, Dict, Any, Optional, Callable
from types import ModuleType
from functools import partial from functools import partial
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
@ -26,6 +27,7 @@ import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
import tractor import tractor
import trio import trio
from trio_typing import TaskStatus
from ._axes import ( from ._axes import (
DynamicDateAxis, DynamicDateAxis,
@ -53,6 +55,7 @@ from ._style import (
_bars_to_left_in_follow_mode, _bars_to_left_in_follow_mode,
) )
from ..data._source import Symbol from ..data._source import Symbol
from ..data._sharedmem import ShmArray
from .. import brokers from .. import brokers
from .. import data from .. import data
from ..data import maybe_open_shm_array from ..data import maybe_open_shm_array
@ -128,7 +131,8 @@ class ChartSpace(QtGui.QWidget):
# self.toolbar_layout.addWidget(self.strategy_box) # self.toolbar_layout.addWidget(self.strategy_box)
def load_symbol( def load_symbol(
self, self,
symbol: Symbol, brokername: str,
symbol_key: str,
data: np.ndarray, data: np.ndarray,
ohlc: bool = True, ohlc: bool = True,
) -> None: ) -> None:
@ -136,12 +140,6 @@ class ChartSpace(QtGui.QWidget):
Expects a ``numpy`` structured array containing all the ohlcv fields. Expects a ``numpy`` structured array containing all the ohlcv fields.
""" """
# XXX: let's see if this causes mem problems
self.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
# TODO: symbol search # TODO: symbol search
# # of course this doesn't work :eyeroll: # # of course this doesn't work :eyeroll:
# h = _font.boundingRect('Ag').height() # h = _font.boundingRect('Ag').height()
@ -151,19 +149,18 @@ class ChartSpace(QtGui.QWidget):
# self.symbol_label.setText(f'/`{symbol}`') # self.symbol_label.setText(f'/`{symbol}`')
linkedcharts = self._chart_cache.setdefault( linkedcharts = self._chart_cache.setdefault(
symbol.key, symbol_key,
LinkedSplitCharts(symbol) LinkedSplitCharts(self)
) )
self.linkedcharts = linkedcharts
# remove any existing plots # remove any existing plots
if not self.v_layout.isEmpty(): if not self.v_layout.isEmpty():
self.v_layout.removeWidget(linkedcharts) self.v_layout.removeWidget(linkedcharts)
main_chart = linkedcharts.plot_ohlc_main(symbol, data)
self.v_layout.addWidget(linkedcharts) self.v_layout.addWidget(linkedcharts)
return linkedcharts, main_chart return linkedcharts
# TODO: add signalling painter system # TODO: add signalling painter system
# def add_signals(self): # def add_signals(self):
@ -187,13 +184,14 @@ class LinkedSplitCharts(QtGui.QWidget):
def __init__( def __init__(
self, self,
symbol: Symbol, chart_space: ChartSpace,
) -> None: ) -> None:
super().__init__() super().__init__()
self.signals_visible: bool = False self.signals_visible: bool = False
self._cursor: Cursor = None # crosshair graphics self._cursor: Cursor = None # crosshair graphics
self.chart: ChartPlotWidget = None # main (ohlc) chart self.chart: ChartPlotWidget = None # main (ohlc) chart
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
self.chart_space = chart_space
self.xaxis = DynamicDateAxis( self.xaxis = DynamicDateAxis(
orientation='bottom', orientation='bottom',
@ -215,7 +213,7 @@ class LinkedSplitCharts(QtGui.QWidget):
self.layout.addWidget(self.splitter) self.layout.addWidget(self.splitter)
# state tracker? # state tracker?
self._symbol: Symbol = symbol self._symbol: Symbol = None
@property @property
def symbol(self) -> Symbol: def symbol(self) -> Symbol:
@ -939,135 +937,6 @@ async def test_bed(
# rlabel.setPos(vb_right - 2*w, d_coords.y()) # rlabel.setPos(vb_right - 2*w, d_coords.y())
async def _async_main(
# implicit required argument provided by ``qtractor_run()``
widgets: Dict[str, Any],
sym: str,
brokername: str,
loglevel: str,
) -> None:
"""Main Qt-trio routine invoked by the Qt loop with
the widgets ``dict``.
"""
chart_app = widgets['main']
# attempt to configure DPI aware font size
_font.configure_to_dpi(current_screen())
# chart_app.init_search()
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
async with data.open_feed(
brokername,
[sym],
loglevel=loglevel,
) as feed:
ohlcv = feed.shm
bars = ohlcv.array
symbol = feed.symbols[sym]
# load in symbol's ohlc data
linked_charts, chart = chart_app.load_symbol(symbol, bars)
# plot historical vwap if available
wap_in_history = False
if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# size view to data once at outset
chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = {
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
volm = ohlcv.array['volume']
if (
np.all(np.isin(volm, -1)) or
np.all(np.isnan(volm))
):
log.warning(
f"{sym} does not seem to have volume info,"
" dropping volume signals")
else:
fsp_conf.update({
'vwap': {
'overlay': True,
'anchor': 'session',
},
})
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,
linked_charts,
fsp_conf,
sym,
ohlcv,
brokermod,
loglevel,
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
wap_in_history,
)
# wait for a first quote before we start any update tasks
quote = await feed.receive()
log.info(f'Received first quote {quote}')
n.start_soon(
check_for_new_bars,
feed,
# delay,
ohlcv,
linked_charts
)
# interactive testing
# n.start_soon(
# test_bed,
# ohlcv,
# chart,
# linked_charts,
# )
await start_order_mode(chart, symbol, brokername)
async def chart_from_quotes( async def chart_from_quotes(
chart: ChartPlotWidget, chart: ChartPlotWidget,
stream, stream,
@ -1245,7 +1114,7 @@ async def spawn_fsps(
""" """
# spawns sub-processes which execute cpu bound FSP code # spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery() as n: async with tractor.open_nursery(loglevel=loglevel) as n:
# spawns local task that consume and chart data streams from # spawns local task that consume and chart data streams from
# sub-procs # sub-procs
@ -1280,66 +1149,36 @@ async def spawn_fsps(
conf['shm'] = shm conf['shm'] = shm
# spawn closure, can probably define elsewhere portal = await n.start_actor(
async def spawn_fsp_daemon( enable_modules=['piker.fsp'],
fsp_name: str, name=display_name,
display_name: str, )
conf: dict,
):
"""Start an fsp subactor async.
""" # init async
# print(f'FSP NAME: {fsp_name}')
portal = await n.run_in_actor(
# subactor entrypoint
fsp.cascade,
# name as title of sub-chart
name=display_name,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_name,
# tractor config
loglevel=loglevel,
)
stream = await portal.result()
# receive last index for processed historical
# data-array as first msg
_ = await stream.receive()
conf['stream'] = stream
conf['portal'] = portal
# new local task
ln.start_soon( ln.start_soon(
spawn_fsp_daemon, run_fsp,
portal,
linked_charts,
brokermod,
sym,
src_shm,
fsp_func_name, fsp_func_name,
display_name, display_name,
conf, conf,
) )
# blocks here until all daemons up # blocks here until all fsp actors complete
# start and block on update loops
async with trio.open_nursery() as ln:
for fsp_func_name, conf in fsps.items():
ln.start_soon(
update_signals,
linked_charts,
fsp_func_name,
conf,
)
async def update_signals( async def run_fsp(
portal: tractor._portal.Portal,
linked_charts: LinkedSplitCharts, linked_charts: LinkedSplitCharts,
brokermod: ModuleType,
sym: str,
src_shm: ShmArray,
fsp_func_name: str, fsp_func_name: str,
display_name: str,
conf: Dict[str, Any], conf: Dict[str, Any],
) -> None: ) -> None:
@ -1348,96 +1187,117 @@ async def update_signals(
This is called once for each entry in the fsp This is called once for each entry in the fsp
config map. config map.
""" """
shm = conf['shm'] async with portal.open_stream_from(
if conf.get('overlay'): # subactor entrypoint
chart = linked_charts.chart fsp.cascade,
chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
)
last_val_sticky = None
else: # name as title of sub-chart
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_func_name,
chart = linked_charts.add_plot( ) as stream:
name=fsp_func_name,
array=shm.array,
# curve by default # receive last index for processed historical
ohlc=False, # data-array as first msg
_ = await stream.receive()
# settings passed down to ``ChartPlotWidget`` conf['stream'] = stream
**conf.get('chart_kwargs', {}) conf['portal'] = portal
# static_yrange=(0, 100),
)
# display contents labels asap shm = conf['shm']
chart.update_contents_labels(
len(shm.array) - 1,
# fsp_func_name
)
# read last value if conf.get('overlay'):
array = shm.array chart = linked_charts.chart
value = array[fsp_func_name][-1] chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
)
last_val_sticky = None
last_val_sticky = chart._ysticks[chart.name] else:
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array) chart = linked_charts.add_plot(
name=fsp_func_name,
array=shm.array,
chart._shm = shm # curve by default
ohlc=False,
# TODO: figure out if we can roll our own `FillToThreshold` to # settings passed down to ``ChartPlotWidget``
# get brush filled polygons for OS/OB conditions. **conf.get('chart_kwargs', {})
# ``pg.FillBetweenItems`` seems to be one technique using # static_yrange=(0, 100),
# generic fills between curve types while ``PlotCurveItem`` has )
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50)
if fsp_func_name == 'rsi': # display contents labels asap
# add moveable over-[sold/bought] lines chart.update_contents_labels(
# and labels only for the 70/30 lines len(shm.array) - 1,
level_line(chart, 20) # fsp_func_name
level_line(chart, 30, orient_v='top') )
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top')
chart._set_yrange() # read last value
array = shm.array
value = array[fsp_func_name][-1]
stream = conf['stream'] last_val_sticky = chart._ysticks[chart.name]
# update chart graphics
async for value in stream:
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
# update graphics chart.update_curve_from_array(fsp_func_name, array)
chart.update_curve_from_array(fsp_func_name, array)
chart._shm = shm
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
# ``pg.FillBetweenItems`` seems to be one technique using
# generic fills between curve types while ``PlotCurveItem`` has
# logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
# might be the best solution?
# graphics = chart.update_from_array(chart.name, array[fsp_func_name])
# graphics.curve.setBrush(50, 50, 200, 100)
# graphics.curve.setFillLevel(50)
if fsp_func_name == 'rsi':
# add moveable over-[sold/bought] lines
# and labels only for the 70/30 lines
level_line(chart, 20)
level_line(chart, 30, orient_v='top')
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top')
chart._set_yrange()
stream = conf['stream']
# update chart graphics
async for value in stream:
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
# update graphics
chart.update_curve_from_array(fsp_func_name, array)
async def check_for_new_bars(feed, ohlcv, linked_charts): async def check_for_new_bars(feed, ohlcv, linked_charts):
@ -1453,45 +1313,226 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
price_chart = linked_charts.chart price_chart = linked_charts.chart
price_chart.default_view() price_chart.default_view()
async for index in await feed.index_stream(): async with feed.index_stream() as stream:
async for index in stream:
# update chart historical bars graphics by incrementing # update chart historical bars graphics by incrementing
# a time step and drawing the history and new bar # a time step and drawing the history and new bar
# When appending a new bar, in the time between the insert # When appending a new bar, in the time between the insert
# from the writing process and the Qt render call, here, # from the writing process and the Qt render call, here,
# the index of the shm buffer may be incremented and the # the index of the shm buffer may be incremented and the
# (render) call here might read the new flat bar appended # (render) call here might read the new flat bar appended
# to the buffer (since -1 index read). In that case H==L and the # to the buffer (since -1 index read). In that case H==L and the
# body will be set as None (not drawn) on what this render call # body will be set as None (not drawn) on what this render call
# *thinks* is the curent bar (even though it's reading data from # *thinks* is the curent bar (even though it's reading data from
# the newly inserted flat bar. # the newly inserted flat bar.
# #
# HACK: We need to therefore write only the history (not the # HACK: We need to therefore write only the history (not the
# current bar) and then either write the current bar manually # current bar) and then either write the current bar manually
# or place a cursor for visual cue of the current time step. # or place a cursor for visual cue of the current time step.
# XXX: this puts a flat bar on the current time step # XXX: this puts a flat bar on the current time step
# TODO: if we eventually have an x-axis time-step "cursor" # TODO: if we eventually have an x-axis time-step "cursor"
# we can get rid of this since it is extra overhead. # we can get rid of this since it is extra overhead.
price_chart.update_ohlc_from_array( price_chart.update_ohlc_from_array(
price_chart.name, price_chart.name,
ohlcv.array, ohlcv.array,
just_history=False, just_history=False,
)
for name in price_chart._overlays:
price_chart.update_curve_from_array(
name,
price_chart._arrays[name]
) )
for name, chart in linked_charts.subplots.items(): for name in price_chart._overlays:
chart.update_curve_from_array(chart.name, chart._shm.array)
# shift the view if in follow mode price_chart.update_curve_from_array(
price_chart.increment_view() name,
price_chart._arrays[name]
)
for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array)
# shift the view if in follow mode
price_chart.increment_view()
async def chart_symbol(
chart_app: ChartSpace,
brokername: str,
sym: str,
loglevel: str,
task_status: TaskStatus[Symbol] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Spawn a real-time chart widget for this symbol and app session.
These widgets can remain up but hidden so that multiple symbols
can be viewed and switched between extremely fast.
"""
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
async with data.open_feed(
brokername,
[sym],
loglevel=loglevel,
) as feed:
ohlcv: ShmArray = feed.shm
bars = ohlcv.array
symbol = feed.symbols[sym]
task_status.started(symbol)
# load in symbol's ohlc data
chart_app.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
# await tractor.breakpoint()
linked_charts = chart_app.linkedcharts
linked_charts._symbol = symbol
chart = linked_charts.plot_ohlc_main(symbol, bars)
chart.setFocus()
# plot historical vwap if available
wap_in_history = False
if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# size view to data once at outset
chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# TODO: eventually we'll support some kind of n-compose syntax
fsp_conf = {
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
# make sure that the instrument supports volume history
# (sometimes this is not the case for some commodities and
# derivatives)
volm = ohlcv.array['volume']
if (
np.all(np.isin(volm, -1)) or
np.all(np.isnan(volm))
):
log.warning(
f"{sym} does not seem to have volume info,"
" dropping volume signals")
else:
fsp_conf.update({
'vwap': {
'overlay': True,
'anchor': 'session',
},
})
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
spawn_fsps,
linked_charts,
fsp_conf,
sym,
ohlcv,
brokermod,
loglevel,
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
wap_in_history,
)
# wait for a first quote before we start any update tasks
quote = await feed.receive()
log.info(f'Received first quote {quote}')
n.start_soon(
check_for_new_bars,
feed,
# delay,
ohlcv,
linked_charts
)
# interactive testing
# n.start_soon(
# test_bed,
# ohlcv,
# chart,
# linked_charts,
# )
await start_order_mode(chart, symbol, brokername)
async def _async_main(
# implicit required argument provided by ``qtractor_run()``
widgets: Dict[str, Any],
symbol_key: str,
brokername: str,
loglevel: str,
) -> None:
"""
Main Qt-trio routine invoked by the Qt loop with the widgets ``dict``.
Provision the "main" widget with initial symbol data and root nursery.
"""
chart_app = widgets['main']
# attempt to configure DPI aware font size
_font.configure_to_dpi(current_screen())
async with trio.open_nursery() as root_n:
# set root nursery for spawning other charts/feeds
# that run cached in the bg
chart_app._root_n = root_n
chart_app.load_symbol(brokername, symbol_key, loglevel)
symbol = await root_n.start(
chart_symbol,
chart_app,
brokername,
symbol_key,
loglevel,
)
chart_app.window.setWindowTitle(
f'{symbol.key}@{symbol.brokers} '
f'tick:{symbol.tick_size}'
)
await trio.sleep_forever()
def _main( def _main(

View File

@ -179,121 +179,121 @@ async def _async_main(
This is started with cli cmd `piker monitor`. This is started with cli cmd `piker monitor`.
''' '''
feed = DataFeed(portal, brokermod) feed = DataFeed(portal, brokermod)
quote_gen, first_quotes = await feed.open_stream( async with feed.open_stream(
symbols, symbols,
'stock', 'stock',
rate=rate, rate=rate,
test=test, test=test,
) ) as (quote_gen, first_quotes):
first_quotes_list = list(first_quotes.copy().values()) first_quotes_list = list(first_quotes.copy().values())
quotes = list(first_quotes.copy().values()) quotes = list(first_quotes.copy().values())
# build out UI # build out UI
Window.set_title(f"monitor: {name}\t(press ? for help)") Window.set_title(f"monitor: {name}\t(press ? for help)")
Builder.load_string(_kv) Builder.load_string(_kv)
box = BoxLayout(orientation='vertical', spacing=0) box = BoxLayout(orientation='vertical', spacing=0)
# define bid-ask "stacked" cells # define bid-ask "stacked" cells
# (TODO: needs some rethinking and renaming for sure) # (TODO: needs some rethinking and renaming for sure)
bidasks = brokermod._stock_bidasks bidasks = brokermod._stock_bidasks
# add header row # add header row
headers = list(first_quotes_list[0].keys()) headers = list(first_quotes_list[0].keys())
headers.remove('displayable') headers.remove('displayable')
header = Row( header = Row(
{key: key for key in headers}, {key: key for key in headers},
headers=headers, headers=headers,
bidasks=bidasks, bidasks=bidasks,
is_header=True, is_header=True,
size_hint=(1, None), size_hint=(1, None),
)
box.add_widget(header)
# build table
table = TickerTable(
cols=1,
size_hint=(1, None),
)
for ticker_record in first_quotes_list:
symbol = ticker_record['symbol']
table.append_row(
symbol,
Row(
ticker_record,
headers=('symbol',),
bidasks=bidasks,
no_cell=('displayable',),
table=table
)
) )
table.last_clicked_row = next(iter(table.symbols2rows.values())) box.add_widget(header)
# associate the col headers row with the ticker table even though # build table
# they're technically wrapped separately in containing BoxLayout table = TickerTable(
header.table = table cols=1,
size_hint=(1, None),
# mark the initial sorted column header as bold and underlined )
sort_cell = header.get_cell(table.sort_key) for ticker_record in first_quotes_list:
sort_cell.bold = sort_cell.underline = True symbol = ticker_record['symbol']
table.last_clicked_col_cell = sort_cell table.append_row(
symbol,
# set up a pager view for large ticker lists Row(
table.bind(minimum_height=table.setter('height')) ticker_record,
headers=('symbol',),
async def spawn_opts_chain(): bidasks=bidasks,
"""Spawn an options chain UI in a new subactor. no_cell=('displayable',),
""" table=table
from .option_chain import _async_main
try:
async with tractor.open_nursery() as tn:
portal = await tn.run_in_actor(
'optschain',
_async_main,
symbol=table.last_clicked_row._last_record['symbol'],
brokername=brokermod.name,
loglevel=tractor.log.get_loglevel(),
) )
except tractor.RemoteActorError: )
# don't allow option chain errors to crash this monitor table.last_clicked_row = next(iter(table.symbols2rows.values()))
# this is, like, the most basic of resliency policies
log.exception(f"{portal.actor.name} crashed:")
async with trio.open_nursery() as nursery: # associate the col headers row with the ticker table even though
pager = PagerView( # they're technically wrapped separately in containing BoxLayout
container=box, header.table = table
contained=table,
nursery=nursery,
# spawn an option chain on 'o' keybinding
kbctls={('o',): spawn_opts_chain},
)
box.add_widget(pager)
widgets = { # mark the initial sorted column header as bold and underlined
'root': box, sort_cell = header.get_cell(table.sort_key)
'table': table, sort_cell.bold = sort_cell.underline = True
'box': box, table.last_clicked_col_cell = sort_cell
'header': header,
'pager': pager,
}
global _widgets # set up a pager view for large ticker lists
_widgets = widgets table.bind(minimum_height=table.setter('height'))
nursery.start_soon( async def spawn_opts_chain():
update_quotes, """Spawn an options chain UI in a new subactor.
nursery, """
brokermod.format_stock_quote, from .option_chain import _async_main
widgets,
quote_gen, try:
feed._symbol_data_cache, async with tractor.open_nursery() as tn:
quotes portal = await tn.run_in_actor(
) 'optschain',
try: _async_main,
await async_runTouchApp(widgets['root']) symbol=table.last_clicked_row._last_record['symbol'],
finally: brokername=brokermod.name,
# cancel remote data feed task loglevel=tractor.log.get_loglevel(),
await quote_gen.aclose() )
# cancel GUI update task except tractor.RemoteActorError:
nursery.cancel_scope.cancel() # don't allow option chain errors to crash this monitor
# this is, like, the most basic of resliency policies
log.exception(f"{portal.actor.name} crashed:")
async with trio.open_nursery() as nursery:
pager = PagerView(
container=box,
contained=table,
nursery=nursery,
# spawn an option chain on 'o' keybinding
kbctls={('o',): spawn_opts_chain},
)
box.add_widget(pager)
widgets = {
'root': box,
'table': table,
'box': box,
'header': header,
'pager': pager,
}
global _widgets
_widgets = widgets
nursery.start_soon(
update_quotes,
nursery,
brokermod.format_stock_quote,
widgets,
quote_gen,
feed._symbol_data_cache,
quotes
)
try:
await async_runTouchApp(widgets['root'])
finally:
# cancel remote data feed task
await quote_gen.aclose()
# cancel GUI update task
nursery.cancel_scope.cancel()