Port charts to new shm arrays

bar_select
Tyler Goodlet 2020-09-17 09:25:30 -04:00
parent f872fbecf8
commit 6fa4f6e943
1 changed files with 130 additions and 110 deletions

View File

@ -20,6 +20,7 @@ from ._style import _xaxis_at, _min_points_to_show, hcolor
from ._source import Symbol from ._source import Symbol
from .. import brokers from .. import brokers
from .. import data from .. import data
from ..data._normalize import iterticks
from ..log import get_logger from ..log import get_logger
from ._exec import run_qtractor from ._exec import run_qtractor
from ._source import base_ohlc_dtype from ._source import base_ohlc_dtype
@ -483,6 +484,7 @@ class ChartPlotWidget(pg.PlotWidget):
array: np.ndarray, array: np.ndarray,
**kwargs, **kwargs,
) -> pg.GraphicsObject: ) -> pg.GraphicsObject:
self._array = array
graphics = self._graphics[name] graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs) graphics.update_from_array(array, **kwargs)
@ -586,8 +588,9 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev) self.scene().leaveEvent(ev)
async def add_new_bars(delay_s, linked_charts): async def check_for_new_bars(delay_s, ohlcv, linked_charts):
"""Task which inserts new bars into the ohlc every ``delay_s`` seconds. """Task which updates from new bars in the shared ohlcv buffer every
``delay_s`` seconds.
""" """
# TODO: right now we'll spin printing bars if the last time # TODO: right now we'll spin printing bars if the last time
# stamp is before a large period of no market activity. # stamp is before a large period of no market activity.
@ -598,29 +601,19 @@ async def add_new_bars(delay_s, linked_charts):
ad = delay_s - 0.002 ad = delay_s - 0.002
price_chart = linked_charts.chart price_chart = linked_charts.chart
ohlc = price_chart._array # ohlc = price_chart._array
async def sleep(): async def sleep():
"""Sleep until next time frames worth has passed from last bar. """Sleep until next time frames worth has passed from last bar.
""" """
last_ts = ohlc[-1]['time'] # last_ts = ohlcv.array[-1]['time']
delay = max((last_ts + ad) - time.time(), 0) # delay = max((last_ts + ad) - time.time(), 0)
await trio.sleep(delay) # await trio.sleep(delay)
await trio.sleep(ad)
# sleep for duration of current bar # sleep for duration of current bar
await sleep() await sleep()
def incr_ohlc_array(array: np.ndarray):
(index, t, close) = array[-1][['index', 'time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum
_next = np.array(array[-1], dtype=array.dtype)
_next[
['index', 'time', 'volume', 'open', 'high', 'low', 'close']
] = (index + 1, t + delay_s, 0, close, close, close, close)
new_array = np.append(array, _next,)
return new_array
while True: while True:
# TODO: bunch of stuff: # TODO: bunch of stuff:
# - I'm starting to think all this logic should be # - I'm starting to think all this logic should be
@ -633,12 +626,6 @@ async def add_new_bars(delay_s, linked_charts):
# of copying it from last bar's close # of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does? # - 5 sec bar lookback-autocorrection like tws does?
# add new increment/bar
start = time.time()
ohlc = price_chart._array = incr_ohlc_array(ohlc)
diff = time.time() - start
print(f'array append took {diff}')
# TODO: generalize this increment logic # TODO: generalize this increment logic
for name, chart in linked_charts.subplots.items(): for name, chart in linked_charts.subplots.items():
@ -655,11 +642,12 @@ async def add_new_bars(delay_s, linked_charts):
# keep. # keep.
# if last_quote == ohlc[-1]: # if last_quote == ohlc[-1]:
# log.debug("Printing flat line for {sym}") # log.debug("Printing flat line for {sym}")
# print(ohlcv.array)
# update chart historical bars graphics # update chart historical bars graphics
price_chart.update_from_array( price_chart.update_from_array(
price_chart.name, price_chart.name,
ohlc, ohlcv.array,
# When appending a new bar, in the time between the insert # When appending a new bar, in the time between the insert
# here and the Qt render call the underlying price data may # here and the Qt render call the underlying price data may
# have already been updated, thus make sure to also update # have already been updated, thus make sure to also update
@ -712,107 +700,134 @@ async def _async_main(
# historical data fetch # historical data fetch
brokermod = brokers.get_brokermod(brokername) brokermod = brokers.get_brokermod(brokername)
async with brokermod.get_client() as client: async with data.open_feed(
# figure out the exact symbol brokername,
bars = await client.bars(symbol=sym) [sym],
loglevel=loglevel,
) as (fquote, stream, incr_stream, ohlcv):
# allow broker to declare historical data fields bars = ohlcv.array
ohlc_dtype = getattr(brokermod, 'ohlc_dtype', base_ohlc_dtype)
# remember, msgpack-numpy's ``from_buffer` returns read-only array # load in symbol's ohlc data
bars = np.array(bars[list(ohlc_dtype.names)]) linked_charts, chart = chart_app.load_symbol(sym, bars)
# load in symbol's ohlc data # plot historical vwap if available
linked_charts, chart = chart_app.load_symbol(sym, bars) vwap_in_history = False
if 'vwap' in bars.dtype.fields:
vwap_in_history = True
chart.draw_curve(
name='vwap',
data=bars['vwap'],
overlay=True,
)
# plot historical vwap if available # determine ohlc delay between bars
vwap_in_history = False # to determine time step between datums
if 'vwap' in bars.dtype.fields: times = bars['time']
vwap_in_history = True delay = times[-1] - times[times != times[-1]][-1]
chart.draw_curve(
name='vwap',
data=bars['vwap'],
overlay=True,
)
# determine ohlc delay between bars async with trio.open_nursery() as n:
times = bars['time']
# find expected time step between datums # load initial fsp chain (otherwise known as "indicators")
delay = times[-1] - times[times != times[-1]][-1] n.start_soon(
chart_from_fsp,
linked_charts,
'rsi', # eventually will be n-compose syntax
sym,
bars,
brokermod,
loglevel,
)
async with trio.open_nursery() as n: # update last price sticky
last_price_sticky = chart._ysticks[chart.name]
# load initial fsp chain (otherwise known as "indicators") last_price_sticky.update_from_data(
n.start_soon( *ohlcv.array[-1][['index', 'close']]
chart_from_fsp, )
linked_charts,
'rsi',
sym,
bars,
brokermod,
loglevel,
)
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*chart._array[-1][['index', 'close']]
)
# graphics update loop
async with data.open_feed(
brokername,
[sym],
loglevel=loglevel,
) as (fquote, stream):
# wait for a first quote before we start any update tasks # wait for a first quote before we start any update tasks
quote = await stream.__anext__() quote = await stream.__anext__()
log.info(f'RECEIVED FIRST QUOTE {quote}') log.info(f'Received first quote {quote}')
# start graphics tasks after receiving first live quote # start graphics update loop(s)after receiving first live quote
n.start_soon(add_new_bars, delay, linked_charts) n.start_soon(
chart_from_quotes,
chart,
stream,
ohlcv,
vwap_in_history,
)
n.start_soon(
check_for_new_bars,
delay,
ohlcv,
linked_charts
)
async for quotes in stream: # probably where we'll eventually start the user input loop
for sym, quote in quotes.items(): await trio.sleep_forever()
ticks = quote.get('ticks', ())
for tick in ticks:
if tick.get('type') == 'trade':
# TODO: eventually we'll want to update
# bid/ask labels and other data as
# subscribed by underlying UI consumers.
# last = quote.get('last') or quote['close']
last = tick['price']
# update ohlc (I guess we're enforcing this async def chart_from_quotes(
# for now?) overwrite from quote chart: ChartPlotWidget,
high, low = chart._array[-1][['high', 'low']] stream,
chart._array[['high', 'low', 'close']][-1] = ( ohlcv: np.ndarray,
max(high, last), vwap_in_history: bool = False,
min(low, last), ) -> None:
last, """The 'main' (price) chart real-time update loop.
) """
chart.update_from_array(
chart.name,
chart._array,
)
# update sticky(s)
last_price_sticky.update_from_data(
*chart._array[-1][['index', 'close']])
chart._set_yrange()
vwap = quote.get('vwap') last_price_sticky = chart._ysticks[chart.name]
if vwap and vwap_in_history:
chart._array['vwap'][-1] = vwap print_next = False
print(f"vwap: {quote['vwap']}") async for quotes in stream:
# update vwap overlay line for sym, quote in quotes.items():
chart.update_from_array( for tick in iterticks(quote, type='trade'):
'vwap', # TODO: eventually we'll want to update
chart._array['vwap'], # bid/ask labels and other data as
) # subscribed by underlying UI consumers.
# last_close = ohlcv.array[-1]['close']
# last = quote.get('last') or quote['close']
# last = tick['price']
# if print_next:
# print(f"next last: {last}")
# print_next = False
# if last_close != last:
# log.error(f"array last_close: {last_close}\nlast: {last}")
# print_next = True
# update ohlc (I guess we're enforcing this
# for now?) overwrite from quote
# high, low = chart._array[-1][['high', 'low']]
# chart._array[['high', 'low', 'close']][-1] = (
# max(high, last),
# min(low, last),
# last,
# )
last = ohlcv.array[-1]
chart.update_from_array(
chart.name,
ohlcv.array,
)
# update sticky(s)
last_price_sticky.update_from_data(
*last[['index', 'close']])
chart._set_yrange()
vwap = quote.get('vwap')
if vwap and vwap_in_history:
# chart._array['vwap'][-1] = vwap
last['vwap'] = vwap
print(f"vwap: {quote['vwap']}")
# update vwap overlay line
chart.update_from_array(
'vwap',
# chart._array['vwap'],
ohlcv.array['vwap'],
)
async def chart_from_fsp( async def chart_from_fsp(
@ -882,7 +897,12 @@ async def chart_from_fsp(
# update chart graphics # update chart graphics
async for value in stream: async for value in stream:
# start = time.time()
chart._array[-1] = value chart._array[-1] = value
# diff = time.time() - start
# print(f'FSP array append took {diff}')
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
chart.update_from_array(chart.name, chart._array) chart.update_from_array(chart.name, chart._array)
# chart._set_yrange() # chart._set_yrange()