bar_select
Tyler Goodlet 2020-09-17 13:22:01 -04:00
parent 6fa4f6e943
commit cd540fd07e
3 changed files with 52 additions and 107 deletions

View File

@ -4,10 +4,9 @@ Data buffers for fast shared humpy.
import time import time
import tractor import tractor
import numpy as np
import trio import trio
from ._sharedmem import SharedArray, attach_shared_array from ._sharedmem import attach_shared_array
@tractor.stream @tractor.stream
@ -40,35 +39,33 @@ async def incr_buffer(
async def sleep(): async def sleep():
"""Sleep until next time frames worth has passed from last bar. """Sleep until next time frames worth has passed from last bar.
""" """
# last_ts = shm.array[-1]['time'] last_ts = shm.array[-1]['time']
# delay = max((last_ts + ad) - time.time(), 0) delay = max((last_ts + ad) - time.time(), 0)
# await trio.sleep(delay) await trio.sleep(delay)
await trio.sleep(ad) # await trio.sleep(ad)
while True: while True:
# sleep for duration of current bar # sleep for duration of current bar
await sleep() await sleep()
# TODO: in theory we could make this faster by copying the
# "last" readable value into the underlying larger buffer's
# next value and then incrementing the counter instead of
# using ``.push()``?
# append new entry to buffer thus "incrementing" the bar # append new entry to buffer thus "incrementing" the bar
array = shm.array array = shm.array
last = array[-1:].copy() last = array[-1:].copy()
# last = np.array(last, dtype=array.dtype)
# shm.push(last)
# array = shm.array
# last = array[-1].copy()
(index, t, close) = last[0][['index', 'time', 'close']] (index, t, close) = last[0][['index', 'time', 'close']]
# new = np.array(last, dtype=array.dtype)
# this copies non-std fields (eg. vwap) from the last datum # this copies non-std fields (eg. vwap) from the last datum
last[ last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close'] ['index', 'time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close) ][0] = (index + 1, t + delay_s, 0, close, close, close, close)
# write to the buffer # write to the buffer
print('incrementing array')
# await tractor.breakpoint()
shm.push(last) shm.push(last)
# print('incrementing array')
# yield the new buffer index value # yield the new buffer index value
await ctx.send_yield(shm._i.value) await ctx.send_yield(shm._i.value)

View File

@ -58,7 +58,6 @@ class SharedInt:
create=create, create=create,
size=4, # std int size=4, # std int
) )
self._token = self._shm.name
@property @property
def value(self) -> int: def value(self) -> int:
@ -68,6 +67,12 @@ class SharedInt:
def value(self, value) -> None: def value(self, value) -> None:
self._shm.buf[:] = value.to_bytes(4, byteorder) self._shm.buf[:] = value.to_bytes(4, byteorder)
def destroy(self) -> None:
if shared_memory._USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
class SharedArray: class SharedArray:
def __init__( def __init__(
@ -87,7 +92,7 @@ class SharedArray:
@property @property
def token(self) -> Tuple[str, str]: def token(self) -> Tuple[str, str]:
return (self._shm.name, self._i._token) return (self._shm.name, self._i._shm.name)
@property @property
def name(self) -> str: def name(self) -> str:
@ -130,8 +135,8 @@ class SharedArray:
if shared_memory._USE_POSIX: if shared_memory._USE_POSIX:
# We manually unlink to bypass all the "resource tracker" # We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems. # nonsense meant for non-SC systems.
shm_unlink(self._i._shm.name)
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
self._i.destroy()
def flush(self) -> None: def flush(self) -> None:
# TODO: flush to storage backend like markestore? # TODO: flush to storage backend like markestore?
@ -141,9 +146,8 @@ class SharedArray:
@contextmanager @contextmanager
def open_shared_array( def open_shared_array(
name: Optional[str] = None, name: Optional[str] = None,
create: bool = True, # approx number of 5s bars in a "day" x2
# approx number of 5s bars in a "day" size: int = int(2*60*60*10/5),
size: int = int(60*60*10/5),
dtype: np.dtype = base_ohlc_dtype, dtype: np.dtype = base_ohlc_dtype,
readonly: bool = False, readonly: bool = False,
) -> SharedArray: ) -> SharedArray:
@ -155,7 +159,11 @@ def open_shared_array(
# create new shared mem segment for which we # create new shared mem segment for which we
# have write permission # have write permission
a = np.zeros(size, dtype=dtype) a = np.zeros(size, dtype=dtype)
shm = shared_memory.SharedMemory(name=name, create=True, size=a.nbytes) shm = shared_memory.SharedMemory(
name=name,
create=True,
size=a.nbytes
)
shmarr = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf) shmarr = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
shmarr[:] = a[:] shmarr[:] = a[:]
shmarr.setflags(write=int(not readonly)) shmarr.setflags(write=int(not readonly))

View File

@ -23,7 +23,6 @@ from .. import data
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..log import get_logger from ..log import get_logger
from ._exec import run_qtractor from ._exec import run_qtractor
from ._source import base_ohlc_dtype
from ._interaction import ChartView from ._interaction import ChartView
from .. import fsp from .. import fsp
@ -588,7 +587,7 @@ class ChartPlotWidget(pg.PlotWidget):
self.scene().leaveEvent(ev) self.scene().leaveEvent(ev)
async def check_for_new_bars(delay_s, ohlcv, linked_charts): async def check_for_new_bars(incr_stream, ohlcv, linked_charts):
"""Task which updates from new bars in the shared ohlcv buffer every """Task which updates from new bars in the shared ohlcv buffer every
``delay_s`` seconds. ``delay_s`` seconds.
""" """
@ -597,36 +596,9 @@ async def check_for_new_bars(delay_s, ohlcv, linked_charts):
# Likely the best way to solve this is to make this task # Likely the best way to solve this is to make this task
# aware of the instrument's tradable hours? # aware of the instrument's tradable hours?
# adjust delay to compensate for trio processing time
ad = delay_s - 0.002
price_chart = linked_charts.chart price_chart = linked_charts.chart
# ohlc = price_chart._array
async def sleep():
"""Sleep until next time frames worth has passed from last bar.
"""
# last_ts = ohlcv.array[-1]['time']
# delay = max((last_ts + ad) - time.time(), 0)
# await trio.sleep(delay)
await trio.sleep(ad)
# sleep for duration of current bar
await sleep()
while True:
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
# done in one place and "graphics update routines"
# should not be doing any length checking and array diffing.
# - don't keep appending, but instead increase the
# underlying array's size less frequently
# - handle odd lot orders
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
async for index in incr_stream:
# TODO: generalize this increment logic # TODO: generalize this increment logic
for name, chart in linked_charts.subplots.items(): for name, chart in linked_charts.subplots.items():
data = chart._array data = chart._array
@ -635,15 +607,6 @@ async def check_for_new_bars(delay_s, ohlcv, linked_charts):
np.array(data[-1], dtype=data.dtype) np.array(data[-1], dtype=data.dtype)
) )
# read value at "open" of bar
# last_quote = ohlc[-1]
# XXX: If the last bar has not changed print a flat line and
# move to the next. This is a "animation" choice that we may not
# keep.
# if last_quote == ohlc[-1]:
# log.debug("Printing flat line for {sym}")
# print(ohlcv.array)
# update chart historical bars graphics # update chart historical bars graphics
price_chart.update_from_array( price_chart.update_from_array(
price_chart.name, price_chart.name,
@ -670,17 +633,6 @@ async def check_for_new_bars(delay_s, ohlcv, linked_charts):
chart.update_from_array(chart.name, chart._array) chart.update_from_array(chart.name, chart._array)
chart._set_yrange() chart._set_yrange()
# We **don't** update the bar right now
# since the next quote that arrives should in the
# tick streaming task
await sleep()
# TODO: should we update a graphics again time here?
# Think about race conditions with data update task.
# UPDATE: don't think this should matter know since the last bar
# and the prior historical bars are being updated in 2 separate
# steps now.
async def _async_main( async def _async_main(
sym: str, sym: str,
@ -721,11 +673,6 @@ async def _async_main(
overlay=True, overlay=True,
) )
# determine ohlc delay between bars
# to determine time step between datums
times = bars['time']
delay = times[-1] - times[times != times[-1]][-1]
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
@ -759,7 +706,8 @@ async def _async_main(
) )
n.start_soon( n.start_soon(
check_for_new_bars, check_for_new_bars,
delay, incr_stream,
# delay,
ohlcv, ohlcv,
linked_charts linked_charts
) )
@ -776,41 +724,35 @@ async def chart_from_quotes(
) -> None: ) -> None:
"""The 'main' (price) chart real-time update loop. """The 'main' (price) chart real-time update loop.
""" """
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
# done in one place and "graphics update routines"
# should not be doing any length checking and array diffing.
# - handle odd lot orders
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
last_price_sticky = chart._ysticks[chart.name] last_price_sticky = chart._ysticks[chart.name]
print_next = False
async for quotes in stream: async for quotes in stream:
for sym, quote in quotes.items(): for sym, quote in quotes.items():
for tick in iterticks(quote, type='trade'): for tick in iterticks(quote, type='trade'):
# TODO: eventually we'll want to update # TODO:
# bid/ask labels and other data as # - eventually we'll want to update bid/ask labels and
# subscribed by underlying UI consumers. # other data as subscribed by underlying UI consumers.
# last_close = ohlcv.array[-1]['close'] # - in theory we should be able to read buffer data
# faster then msgs arrive.. needs some tinkering and
# last = quote.get('last') or quote['close'] # testing
# last = tick['price'] start = time.time()
array = ohlcv.array
# if print_next: diff = time.time() - start
# print(f"next last: {last}") print(f'read time: {diff}')
# print_next = False # last = ohlcv.array[-1]
last = array[-1]
# if last_close != last:
# log.error(f"array last_close: {last_close}\nlast: {last}")
# print_next = True
# update ohlc (I guess we're enforcing this
# for now?) overwrite from quote
# high, low = chart._array[-1][['high', 'low']]
# chart._array[['high', 'low', 'close']][-1] = (
# max(high, last),
# min(low, last),
# last,
# )
last = ohlcv.array[-1]
chart.update_from_array( chart.update_from_array(
chart.name, chart.name,
ohlcv.array, # ohlcv.array,
array,
) )
# update sticky(s) # update sticky(s)
last_price_sticky.update_from_data( last_price_sticky.update_from_data(
@ -819,13 +761,11 @@ async def chart_from_quotes(
vwap = quote.get('vwap') vwap = quote.get('vwap')
if vwap and vwap_in_history: if vwap and vwap_in_history:
# chart._array['vwap'][-1] = vwap
last['vwap'] = vwap last['vwap'] = vwap
print(f"vwap: {quote['vwap']}") print(f"vwap: {quote['vwap']}")
# update vwap overlay line # update vwap overlay line
chart.update_from_array( chart.update_from_array(
'vwap', 'vwap',
# chart._array['vwap'],
ohlcv.array['vwap'], ohlcv.array['vwap'],
) )