Compare commits

...

33 Commits

Author SHA1 Message Date
Tyler Goodlet 9aeb8603ac Use .shield() meth name from tractor 2020-12-19 14:39:12 -05:00
Tyler Goodlet a790a59962 Drop profile calls on OHLC bars for now 2020-12-19 14:39:12 -05:00
Tyler Goodlet 7cf3c4a86c Add signal backfilling via trio task respawn 2020-12-19 14:39:12 -05:00
Tyler Goodlet 47856666b5 Drop legacy "historical" QPicture cruft 2020-12-19 14:39:11 -05:00
Tyler Goodlet ff103070ea More general salutation 2020-12-19 14:39:11 -05:00
Tyler Goodlet 639f34eaac Stick with slightly smaller fonts 2020-12-19 14:39:11 -05:00
Tyler Goodlet 38798c7a53 Fix axes for shm primary indexing 2020-12-19 14:39:11 -05:00
Tyler Goodlet e34fe6c594 Port charting to new shm primary indexing 2020-12-19 14:39:11 -05:00
Tyler Goodlet be18c75428 Close app on last window exit
Use a system triggered SIGINT on app close to tear down the streaming
stack and terminate the `trio`/`tractor` runtimes deterministically.
2020-12-19 14:39:11 -05:00
Tyler Goodlet c8073ad891 Port kraken to declare "wap" field 2020-12-19 14:39:11 -05:00
Tyler Goodlet ffb405c74b Port data apis to not touch primary index 2020-12-19 14:39:11 -05:00
Tyler Goodlet 2bcebe779a Add historical backfilling to ib backend 2020-12-19 14:39:11 -05:00
Tyler Goodlet 4bb02ded2e Ensure right bar x index is an int 2020-12-19 14:39:11 -05:00
Tyler Goodlet 8efa2d2fc7 First draft, make graphics work on shm primary index
This is a bit hacky (what with array indexing semantics being relative
to the primary index's "start" value but it works. We'll likely want
to somehow wrap this index finagling into an API soon.
2020-12-19 14:39:11 -05:00
Tyler Goodlet 5b8adc8881 Add prepend support to shm system 2020-12-19 14:39:11 -05:00
Tyler Goodlet 5b8e72065a Left align yaxis label 2020-12-19 14:39:11 -05:00
Tyler Goodlet c18a37c75b Font size tweaks for low dpi 2020-12-19 14:39:11 -05:00
Tyler Goodlet 36445363a0 Use `numpy.divide()` to avoid divide-by-zero 2020-12-19 14:39:11 -05:00
Tyler Goodlet 599429e54b Tidy up doc string 2020-12-19 14:39:11 -05:00
Tyler Goodlet ea8f17c94b Attempt to add numba typing and use `QGraphicsPathItem`
Failed at using either.

Quirks in numba's typing require specifying readonly arrays by
composing types manually.

The graphics item path thing, while it does take less time to write on
bar appends, seems to be slower in general in calculating the
``.boundingRect()`` value. Likely we'll just add manual max/min tracking
on array updates like ``pg.PlotCurveItem`` to squeeze some final juices
on this.
2020-12-19 14:39:11 -05:00
Tyler Goodlet 26d1632888 Drop commented pixmap cruft
See #124 as to why we'll probably never need this.
2020-12-19 14:39:11 -05:00
Tyler Goodlet c79cbf41aa Get `QPainterPath` "append" working
Pertains further to #109.

Instead of redrawing the entire `QPainterPath` every time there's
a historical bars update just use `.addPath()` to slap in latest
history. It seems to work and is fast. This also seems like it will be
a great strategy for filling in earlier data, woot!
2020-12-19 14:39:11 -05:00
Tyler Goodlet 7fe62690b9 Draw bars using `QPainterPath` magic
This gives a massive speedup when viewing large bar sets (think a day's
worth of 5s bars) by using the `pg.functions.arrayToQPath()` "magic"
binary array writing that is also used in `PlotCurveItem`.  We're using
this same (lower level) function directly to draw bars as part of one
large path and it seems to be painting 15k (ish) bars with around 3ms
`.paint()` latency. The only thing still a bit slow is the path array
generation despite doing it with `numba`. Likely, either having multiple
paths or, only regenerating the missing backing array elements should
speed this up further to avoid slight delays when incrementing the bar
step.

This is of course a first draft and more cleanups are coming.
2020-12-19 14:39:11 -05:00
Tyler Goodlet 46089d36c9 Add field diffing on failed push 2020-12-19 14:39:11 -05:00
Tyler Goodlet af967b333f Tweak axis text offset and margins 2020-12-19 14:39:11 -05:00
Tyler Goodlet 26adbe382c Use new global var stack from tractor 2020-12-19 14:39:11 -05:00
Tyler Goodlet cc21b0386c Kill the tractor tree on window close.
This makes it so you don't have to ctrl-c kill apps.
Add in the experimental openGL support even though I'm pretty sure it's
not being used much for curve plotting (but could be wrong).
2020-12-19 14:39:11 -05:00
Tyler Goodlet 8695542a2a Add commented ex. code for line price charts 2020-12-19 14:39:11 -05:00
Tyler Goodlet 11a280dc10 Allocate space for 2d worth of 5s bars 2020-12-19 14:39:11 -05:00
Tyler Goodlet 1536b97b3c Break hist calc into wap func, use hlc3. 2020-12-19 14:39:11 -05:00
Tyler Goodlet a9caee17a1 Put fsp plotting into a couple tasks, startup speedups.
Break the chart update code for fsps into a new task (add a nursery) in
new `spawn_fsps` (was `chart_from_fsps`) that async requests actor
spawning and initial historical data (all CPU bound work).  For multiple
fsp subcharts this allows processing initial output in parallel
(multi-core). We might want to wrap this in a "feed" like api
eventually. Basically the fsp startup sequence is now:
- start all requested fsp actors in an async loop and wait for
  historical data to arrive
- loop through them all again to start update tasks which do chart
  graphics rendering

Add separate x-axis objects for each new subchart (required by
pyqtgraph); still need to fix hiding unnecessary ones.
Add a `ChartPlotWidget._arrays: dict` for holding overlay data distinct
from ohlc. Drop the sizing yrange to label heights for now since it's
pretty much all gone to hell since adding L1 labels. Fix y-stickies to
look up correct overly arrays.
2020-12-19 14:39:11 -05:00
Tyler Goodlet 6188b47157 Add vwap to exposed fsp map 2020-12-19 14:39:11 -05:00
Tyler Goodlet 6a2f086bd7 Add initial tina (ohl3) vwap fsp 2020-12-19 14:39:11 -05:00
16 changed files with 1381 additions and 595 deletions

View File

@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``. ``infected_aio==True``.
""" """
from contextlib import asynccontextmanager, contextmanager from contextlib import asynccontextmanager
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio import asyncio
import logging import logging
@ -32,6 +33,7 @@ import itertools
import time import time
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
import ib_insync as ibis import ib_insync as ibis
@ -45,7 +47,7 @@ from ..data import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
iterticks, iterticks,
attach_shm_array, attach_shm_array,
get_shm_token, # get_shm_token,
subscribe_ohlc_for_increment, subscribe_ohlc_for_increment,
) )
from ..data._source import from_df from ..data._source import from_df
@ -86,6 +88,8 @@ _time_frames = {
'Y': 'OneYear', 'Y': 'OneYear',
} }
_show_wap_in_history = False
# overrides to sidestep pretty questionable design decisions in # overrides to sidestep pretty questionable design decisions in
# ``ib_insync``: # ``ib_insync``:
@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
} }
_enters = 0
class Client: class Client:
"""IB wrapped for our broker backend API. """IB wrapped for our broker backend API.
@ -142,32 +148,54 @@ class Client:
self.ib = ib self.ib = ib
self.ib.RaiseRequestErrors = True self.ib.RaiseRequestErrors = True
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def bars( async def bars(
self, self,
symbol: str, symbol: str,
# EST in ISO 8601 format is required... below is EPOCH # EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00", start_dt: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m', end_dt: str = "",
count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False, sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present. """Retreive OHLCV bars for a symbol over a range to the present.
""" """
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters}')
_enters += 1
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count) # _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
contract, contract,
endDateTime='', endDateTime=end_dt,
# durationStr='60 S',
# durationStr='1 D', # time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',
# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count),
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# time length calcs
durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs',
# always use extended hours # always use extended hours
useRTH=False, useRTH=False,
@ -181,9 +209,13 @@ class Client:
# TODO: raise underlying error here # TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?") raise ValueError(f"No bars retreived for {symbol}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe: # convert to pandas dataframe:
df = ibis.util.df(bars) df = ibis.util.df(bars)
return from_df(df) return bars, from_df(df)
def onError(self, reqId, errorCode, errorString, contract) -> None:
breakpoint()
async def search_stocks( async def search_stocks(
self, self,
@ -237,6 +269,8 @@ class Client:
"""Get an unqualifed contract for the current "continous" future. """Get an unqualifed contract for the current "continous" future.
""" """
contcon = ibis.ContFuture(symbol, exchange=exchange) contcon = ibis.ContFuture(symbol, exchange=exchange)
# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId) return ibis.Future(conId=frontcon.conId)
@ -279,10 +313,10 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD' currency = 'CAD'
if exch in ('PURE',): if exch in ('PURE', 'TSE'):
# stupid ib... # stupid ib...
primaryExchange = exch
exch = 'SMART' exch = 'SMART'
primaryExchange = 'PURE'
con = ibis.Stock( con = ibis.Stock(
symbol=sym, symbol=sym,
@ -293,10 +327,27 @@ class Client:
try: try:
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0] contract = (await self.ib.qualifyContractsAsync(con))[0]
head = await self.get_head_time(contract)
print(head)
except IndexError: except IndexError:
raise ValueError(f"No contract could be found {con}") raise ValueError(f"No contract could be found {con}")
return contract return contract
async def get_head_time(
self,
contract: Contract,
) -> datetime:
"""Return the first datetime stamp for ``contract``.
"""
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
useRTH=False,
formatDate=2, # timezone aware UTC datetime
)
async def stream_ticker( async def stream_ticker(
self, self,
symbol: str, symbol: str,
@ -309,7 +360,13 @@ class Client:
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
def push(t): def push(t):
"""Push quotes to trio task.
"""
# log.debug(t) # log.debug(t)
try: try:
to_trio.send_nowait(t) to_trio.send_nowait(t)
@ -346,9 +403,17 @@ async def _aio_get_client(
""" """
# first check cache for existing client # first check cache for existing client
# breakpoint()
try: try:
yield _client_cache[(host, port)] if port:
except KeyError: client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record # TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one. # of existing brokerd we need to broadcast for one.
@ -359,9 +424,11 @@ async def _aio_get_client(
ib = NonShittyIB() ib = NonShittyIB()
ports = _try_ports if port is None else [port] ports = _try_ports if port is None else [port]
_err = None _err = None
for port in ports: for port in ports:
try: try:
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id) await ib.connectAsync(host, port, clientId=client_id)
break break
except ConnectionRefusedError as ce: except ConnectionRefusedError as ce:
@ -373,6 +440,7 @@ async def _aio_get_client(
try: try:
client = Client(ib) client = Client(ib)
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client yield client
except BaseException: except BaseException:
ib.disconnect() ib.disconnect()
@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None, from_trio=None,
**kwargs, **kwargs,
) -> None: ) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client: async with _aio_get_client() as client:
async_meth = getattr(client, meth) async_meth = getattr(client, meth)
@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str, method: str,
**kwargs, **kwargs,
) -> None: ) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.
"""
ca = tractor.current_actor() ca = tractor.current_actor()
assert ca.is_infected_aio() assert ca.is_infected_aio()
@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {} _local_buffer_writers = {}
@contextmanager @asynccontextmanager
def activate_writer(key: str): async def activate_writer(key: str) -> (bool, trio.Nursery):
try: try:
writer_already_exists = _local_buffer_writers.get(key, False) writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists: if not writer_already_exists:
_local_buffer_writers[key] = True _local_buffer_writers[key] = True
yield writer_already_exists async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally: finally:
_local_buffer_writers.pop(key, None) _local_buffer_writers.pop(key, None)
async def fill_bars(
first_bars,
shm,
count: int = 21,
) -> None:
"""Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
"""
next_dt = first_bars[0].date
i = 0
while i < count:
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol='.'.join(
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt,
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date
except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error~~
if err.code == 162:
log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS")
await tractor.breakpoint()
# TODO: figure out how to share quote feeds sanely despite # TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api. # the wacky ``ib_insync`` api.
# @tractor.msg.pub # @tractor.msg.pub
@ -575,7 +687,9 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task, # check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing # otherwise start one and mark it as now existing
with activate_writer(shm_token['shm_name']) as writer_already_exists: async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem # maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous # check if shm has already been created by previous
@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer # we are the buffer writer
readonly=False, readonly=False,
) )
bars = await _trio_run_client_method(
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars', method='bars',
symbol=sym, symbol=sym,
) )
if bars is None: log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym) raise SymbolNotFound(sym)
# write historical data to buffer # write historical data to buffer
shm.push(bars) shm.push(bars_array)
shm_token = shm.token shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, bars, shm)
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
@ -656,6 +781,7 @@ async def stream_quotes(
# real-time stream # real-time stream
async for ticker in stream: async for ticker in stream:
# print(ticker.vwap)
quote = normalize( quote = normalize(
ticker, ticker,
calc_price=calc_price calc_price=calc_price
@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)): for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price'] last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry # update last entry
# benchmarked in the 4-5 us range # benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][ o, high, low, v = shm.array[-1][
@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price # is also the close/last trade price
o = last o = last
shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = ( shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o, o,
max(high, last), max(high, last),
min(low, last), min(low, last),

View File

@ -57,13 +57,15 @@ _ohlc_dtype = [
('close', float), ('close', float),
('volume', float), ('volume', float),
('count', int), ('count', int),
('vwap', float), ('bar_wap', float),
] ]
# UI components allow this to be declared such that additional # UI components allow this to be declared such that additional
# (historical) fields can be exposed. # (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype) ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True
class Client: class Client:
@ -341,7 +343,7 @@ async def stream_quotes(
while True: while True:
try: try:
async with trio_websocket.open_websocket_url( async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com', 'wss://ws.kraken.com/',
) as ws: ) as ws:
# XXX: setup subs # XXX: setup subs
@ -433,7 +435,7 @@ async def stream_quotes(
'high', 'high',
'low', 'low',
'close', 'close',
'vwap', 'bar_wap', # in this case vwap of bar
'volume'] 'volume']
][-1] = ( ][-1] = (
o, o,

View File

@ -42,7 +42,7 @@ from ._sharedmem import (
ShmArray, ShmArray,
get_shm_token, get_shm_token,
) )
from ._source import base_ohlc_dtype from ._source import base_iohlc_dtype
from ._buffer import ( from ._buffer import (
increment_ohlc_buffer, increment_ohlc_buffer,
subscribe_ohlc_for_increment subscribe_ohlc_for_increment
@ -139,6 +139,7 @@ class Feed:
name: str name: str
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray shm: ShmArray
# ticks: ShmArray
_broker_portal: tractor._portal.Portal _broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -188,7 +189,7 @@ async def open_feed(
key=sym_to_shm_key(name, symbols[0]), key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=True, readonly=True,

View File

@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
# append new entry to buffer thus "incrementing" the bar # append new entry to buffer thus "incrementing" the bar
array = shm.array array = shm.array
last = array[-1:].copy() last = array[-1:][shm._write_fields].copy()
(index, t, close) = last[0][['index', 'time', 'close']] # (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum # this copies non-std fields (eg. vwap) from the last datum
last[ last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close'] ['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close) ][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer # write to the buffer
shm.push(last) shm.push(last)
# broadcast the buffer index step # broadcast the buffer index step
yield {'index': shm._i.value} yield {'index': shm._last.value}
def subscribe_ohlc_for_increment( def subscribe_ohlc_for_increment(

View File

@ -33,6 +33,6 @@ def iterticks(
ticks = quote.get('ticks', ()) ticks = quote.get('ticks', ())
if ticks: if ticks:
for tick in ticks: for tick in ticks:
print(f"{quote['symbol']}: {tick}") # print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types: if tick.get('type') in types:
yield tick yield tick

View File

@ -17,11 +17,10 @@
""" """
NumPy compatible shared memory buffers for real-time FSP. NumPy compatible shared memory buffers for real-time FSP.
""" """
from typing import List
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from sys import byteorder from sys import byteorder
from typing import Tuple, Optional from typing import List, Tuple, Optional
from multiprocessing import shared_memory from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink from _posixshmem import shm_unlink
@ -29,7 +28,7 @@ import tractor
import numpy as np import numpy as np
from ..log import get_logger from ..log import get_logger
from ._source import base_ohlc_dtype from ._source import base_ohlc_dtype, base_iohlc_dtype
log = get_logger(__name__) log = get_logger(__name__)
@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd
class SharedInt: class SharedInt:
"""Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
"""
def __init__( def __init__(
self, self,
token: str, shm: SharedMemory,
create: bool = False,
) -> None: ) -> None:
# create a single entry array for storing an index counter self._shm = shm
self._shm = shared_memory.SharedMemory(
name=token,
create=create,
size=4, # std int
)
@property @property
def value(self) -> int: def value(self) -> int:
@ -79,7 +76,7 @@ class SharedInt:
self._shm.buf[:] = value.to_bytes(4, byteorder) self._shm.buf[:] = value.to_bytes(4, byteorder)
def destroy(self) -> None: def destroy(self) -> None:
if shared_memory._USE_POSIX: if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker" # We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems. # nonsense meant for non-SC systems.
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
@ -91,7 +88,8 @@ class _Token:
which can be used to key a system wide post shm entry. which can be used to key a system wide post shm entry.
""" """
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
shm_counter_name: str shm_first_index_name: str
shm_last_index_name: str
dtype_descr: List[Tuple[str]] dtype_descr: List[Tuple[str]]
def __post_init__(self): def __post_init__(self):
@ -130,27 +128,47 @@ def _make_token(
"""Create a serializable token that can be used """Create a serializable token that can be used
to access a shared array. to access a shared array.
""" """
dtype = base_ohlc_dtype if dtype is None else dtype dtype = base_iohlc_dtype if dtype is None else dtype
return _Token( return _Token(
key, key,
key + "_counter", key + "_first",
key + "_last",
np.dtype(dtype).descr np.dtype(dtype).descr
) )
class ShmArray: class ShmArray:
"""A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
"""
def __init__( def __init__(
self, self,
shmarr: np.ndarray, shmarr: np.ndarray,
counter: SharedInt, first: SharedInt,
shm: shared_memory.SharedMemory, last: SharedInt,
readonly: bool = True, shm: SharedMemory,
# readonly: bool = True,
) -> None: ) -> None:
self._array = shmarr self._array = shmarr
self._i = counter
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr) self._len = len(shmarr)
self._shm = shm self._shm = shm
self._readonly = readonly
# pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api? # TODO: ringbuf api?
@ -158,24 +176,25 @@ class ShmArray:
def _token(self) -> _Token: def _token(self) -> _Token:
return _Token( return _Token(
self._shm.name, self._shm.name,
self._i._shm.name, self._first._shm.name,
self._last._shm.name,
self._array.dtype.descr, self._array.dtype.descr,
) )
@property @property
def token(self) -> dict: def token(self) -> dict:
"""Shared memory token that can be serialized """Shared memory token that can be serialized and used by
and used by another process to attach to this array. another process to attach to this array.
""" """
return self._token.as_msg() return self._token.as_msg()
@property @property
def index(self) -> int: def index(self) -> int:
return self._i.value % self._len return self._last.value % self._len
@property @property
def array(self) -> np.ndarray: def array(self) -> np.ndarray:
return self._array[:self._i.value] return self._array[self._first.value:self._last.value]
def last( def last(
self, self,
@ -186,38 +205,90 @@ class ShmArray:
def push( def push(
self, self,
data: np.ndarray, data: np.ndarray,
prepend: bool = False,
) -> int: ) -> int:
"""Ring buffer like "push" to append data """Ring buffer like "push" to append data
into the buffer and return updated index. into the buffer and return updated "last" index.
""" """
length = len(data) length = len(data)
# TODO: use .index for actual ring logic?
index = self._i.value if prepend:
index = self._first.value - length
else:
index = self._last.value
end = index + length end = index + length
self._array[index:end] = data[:]
self._i.value = end fields = self._write_fields
return end
try:
self._array[fields][index:end] = data[fields][:]
if prepend:
self._first.value = index
else:
self._last.value = end
return end
except ValueError as err:
# shoudl raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None: def close(self) -> None:
self._i._shm.close() self._first._shm.close()
self._last._shm.close()
self._shm.close() self._shm.close()
def destroy(self) -> None: def destroy(self) -> None:
if shared_memory._USE_POSIX: if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker" # We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems. # nonsense meant for non-SC systems.
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
self._i.destroy()
self._first.destroy()
self._last.destroy()
def flush(self) -> None: def flush(self) -> None:
# TODO: flush to storage backend like markestore? # TODO: flush to storage backend like markestore?
... ...
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 12)
_default_size = 2 * _secs_in_day
def open_shm_array( def open_shm_array(
key: Optional[str] = None, key: Optional[str] = None,
# approx number of 5s bars in a "day" x2 size: int = _default_size,
size: int = int(2*60*60*10/5),
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
readonly: bool = False, readonly: bool = False,
) -> ShmArray: ) -> ShmArray:
@ -229,7 +300,9 @@ def open_shm_array(
# create new shared mem segment for which we # create new shared mem segment for which we
# have write permission # have write permission
a = np.zeros(size, dtype=dtype) a = np.zeros(size, dtype=dtype)
shm = shared_memory.SharedMemory( a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key, name=key,
create=True, create=True,
size=a.nbytes size=a.nbytes
@ -243,17 +316,30 @@ def open_shm_array(
dtype=dtype dtype=dtype
) )
counter = SharedInt( # create single entry arrays for storing an first and last indices
token=token.shm_counter_name, first = SharedInt(
create=True, shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
) )
counter.value = 0
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
last.value = first.value = int(_secs_in_day)
shmarr = ShmArray( shmarr = ShmArray(
array, array,
counter, first,
last,
shm, shm,
readonly=readonly,
) )
assert shmarr._token == token assert shmarr._token == token
@ -261,26 +347,31 @@ def open_shm_array(
# "unlink" created shm on process teardown by # "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack # pushing teardown calls onto actor context stack
actor = tractor.current_actor() tractor._actor._lifetime_stack.callback(shmarr.close)
actor._lifetime_stack.callback(shmarr.close) tractor._actor._lifetime_stack.callback(shmarr.destroy)
actor._lifetime_stack.callback(shmarr.destroy)
return shmarr return shmarr
def attach_shm_array( def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]], token: Tuple[str, str, Tuple[str, str]],
size: int = int(60*60*10/5), size: int = _default_size,
readonly: bool = True, readonly: bool = True,
) -> ShmArray: ) -> ShmArray:
"""Load and attach to an existing shared memory array previously """Attach to an existing shared memory array previously
created by another process using ``open_shared_array``. created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
""" """
token = _Token.from_msg(token) token = _Token.from_msg(token)
key = token.shm_name key = token.shm_name
if key in _known_tokens: if key in _known_tokens:
assert _known_tokens[key] == token, "WTF" assert _known_tokens[key] == token, "WTF"
shm = shared_memory.SharedMemory(name=key) # attach to array buffer and view as per dtype
shm = SharedMemory(name=key)
shmarr = np.ndarray( shmarr = np.ndarray(
(size,), (size,),
dtype=token.dtype_descr, dtype=token.dtype_descr,
@ -288,15 +379,29 @@ def attach_shm_array(
) )
shmarr.setflags(write=int(not readonly)) shmarr.setflags(write=int(not readonly))
counter = SharedInt(token=token.shm_counter_name) first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read # make sure we can read
counter.value first.value
sha = ShmArray( sha = ShmArray(
shmarr, shmarr,
counter, first,
last,
shm, shm,
readonly=readonly,
) )
# read test # read test
sha.array sha.array
@ -308,8 +413,8 @@ def attach_shm_array(
_known_tokens[key] = token _known_tokens[key] = token
# "close" attached shm on process teardown # "close" attached shm on process teardown
actor = tractor.current_actor() tractor._actor._lifetime_stack.callback(sha.close)
actor._lifetime_stack.callback(sha.close)
return sha return sha
@ -318,21 +423,20 @@ def maybe_open_shm_array(
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
**kwargs, **kwargs,
) -> Tuple[ShmArray, bool]: ) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block by a """Attempt to attach to a shared memory block using a "key" lookup
"key" determined by the users overall "system" to registered blocks in the users overall "system" registryt
(presumes you don't have the block's explicit token). (presumes you don't have the block's explicit token).
This function is meant to solve the problem of This function is meant to solve the problem of discovering whether
discovering whether a shared array token has been a shared array token has been allocated or discovered by the actor
allocated or discovered by the actor running in running in **this** process. Systems where multiple actors may seek
**this** process. Systems where multiple actors to access a common block can use this function to attempt to acquire
may seek to access a common block can use this a token as discovered by the actors who have previously stored
function to attempt to acquire a token as discovered a "key" -> ``_Token`` map in an actor local (aka python global)
by the actors who have previously stored a variable.
"key" -> ``_Token`` map in an actor local variable.
If you know the explicit ``_Token`` for your memory If you know the explicit ``_Token`` for your memory segment instead
instead use ``attach_shm_array``. use ``attach_shm_array``.
""" """
try: try:
# see if we already know this key # see if we already know this key

View File

@ -15,27 +15,36 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Numpy data source machinery. numpy data source coversion helpers.
""" """
import decimal import decimal
from dataclasses import dataclass from dataclasses import dataclass
import numpy as np import numpy as np
import pandas as pd import pandas as pd
# from numba import from_dtype
ohlc_fields = [
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
('bar_wap', float),
]
ohlc_with_index = ohlc_fields.copy()
ohlc_with_index.insert(0, ('index', int))
# our minimum structured array layout for ohlc data # our minimum structured array layout for ohlc data
base_ohlc_dtype = np.dtype( base_iohlc_dtype = np.dtype(ohlc_with_index)
[ base_ohlc_dtype = np.dtype(ohlc_fields)
('index', int),
('time', float), # TODO: for now need to construct this manually for readonly arrays, see
('open', float), # https://github.com/numba/numba/issues/4511
('high', float), # numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
('low', float),
('close', float),
('volume', int),
]
)
# map time frame "keys" to minutes values # map time frame "keys" to minutes values
tf_in_1m = { tf_in_1m = {
@ -110,18 +119,27 @@ def from_df(
'Low': 'low', 'Low': 'low',
'Close': 'close', 'Close': 'close',
'Volume': 'volume', 'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
} }
df = df.rename(columns=columns) df = df.rename(columns=columns)
for name in df.columns: for name in df.columns:
if name not in base_ohlc_dtype.names[1:]: # if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name] del df[name]
# TODO: it turns out column access on recarrays is actually slower: # TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays? # it might make sense to make these structured arrays?
array = df.to_records() array = df.to_records(index=False)
_nan_to_closest_num(array) _nan_to_closest_num(array)
return array return array

View File

@ -20,18 +20,24 @@ Financial signal processing for the peeps.
from typing import AsyncIterator, Callable, Tuple from typing import AsyncIterator, Callable, Tuple
import trio import trio
from trio_typing import TaskStatus
import tractor import tractor
import numpy as np import numpy as np
from ..log import get_logger from ..log import get_logger
from .. import data from .. import data
from ._momo import _rsi from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array, Feed from ..data import attach_shm_array, Feed
log = get_logger(__name__) log = get_logger(__name__)
_fsps = {'rsi': _rsi} _fsps = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
}
async def latency( async def latency(
@ -70,7 +76,7 @@ async def increment_signals(
# write new slot to the buffer # write new slot to the buffer
dst_shm.push(last) dst_shm.push(last)
len(dst_shm.array)
@tractor.stream @tractor.stream
@ -95,66 +101,107 @@ async def cascade(
async with data.open_feed(brokername, [symbol]) as feed: async with data.open_feed(brokername, [symbol]) as feed:
assert src.token == feed.shm.token assert src.token == feed.shm.token
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream): async def fsp_compute(
async for quotes in stream: task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
for symbol, quotes in quotes.items(): ) -> None:
if symbol == sym:
yield quotes
out_stream = func( # TODO: load appropriate fsp with input args
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: XXX: async def filter_by_sym(
# THERE'S A BIG BUG HERE WITH THE `index` field since we're sym: str,
# prepending a copy of the first value a few times to make stream,
# sub-curves align with the parent bar chart. ):
# # task cancellation won't kill the channel
# This likely needs to be fixed either by, with stream.shield():
# - manually assigning the index and historical data async for quotes in stream:
# seperately to the shm array (i.e. not using .push()) for symbol, quotes in quotes.items():
# - developing some system on top of the shared mem array that if symbol == sym:
# is `index` aware such that historical data can be indexed yield quotes
# relative to the true first datum? Not sure if this is sane
# for derivatives.
# Conduct a single iteration of fsp with historical bars input out_stream = func(
# and get historical output filter_by_sym(symbol, feed.stream),
history_output = await out_stream.__anext__() feed.shm,
)
# build a struct array which includes an 'index' field to push # TODO: XXX:
# as history # THERE'S A BIG BUG HERE WITH THE `index` field since we're
history = np.array( # prepending a copy of the first value a few times to make
np.arange(len(history_output)), # sub-curves align with the parent bar chart.
dtype=dst.array.dtype # This likely needs to be fixed either by,
) # - manually assigning the index and historical data
history[fsp_func_name] = history_output # seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# TODO: talk to ``pyqtgraph`` core about proper way to solve this: # Conduct a single iteration of fsp with historical bars input
# XXX: hack to get curves aligned with bars graphics: prepend # and get historical output
# a copy of the first datum.. history_output = await out_stream.__anext__()
# dst.push(history[:1])
# check for data length mis-allignment and fill missing values # build a struct array which includes an 'index' field to push
diff = len(src.array) - len(history) # as history
if diff >= 0: history = np.array(
print(f"WTF DIFFZZZ {diff}") np.arange(len(history_output)),
for _ in range(diff): dtype=dst.array.dtype
dst.push(history[:1]) )
history[fsp_func_name] = history_output
# compare with source signal and time align
index = dst.push(history)
yield index # check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
n.start_soon(increment_signals, feed, dst)
async for processed in out_stream: cs = await n.start(fsp_compute)
log.debug(f"{fsp_func_name}: {processed}")
index = src.index # Increment the underlying shared memory buffer on every "increment"
dst.array[-1][fsp_func_name] = processed # msg received from the underlying data feed.
await ctx.send_yield(index)
async for msg in await feed.index_stream():
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
array = dst.array
last = array[-1:].copy()
# write new slot to the buffer
dst.push(last)
last_len = new_len

View File

@ -151,8 +151,8 @@ def wma(
return np.convolve(signal, weights, 'valid') return np.convolve(signal, weights, 'valid')
# @piker.fsp( # @piker.fsp.signal(
# aggregates=[60, 60*5, 60*60, '4H', '1D'], # timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# ) # )
async def _rsi( async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa source: 'QuoteStream[Dict[str, Any]]', # noqa

View File

@ -0,0 +1,93 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import AsyncIterator, Optional
import numpy as np
from ..data._normalize import iterticks
def wap(
signal: np.ndarray,
weights: np.ndarray,
) -> np.ndarray:
"""Weighted average price from signal and weights.
"""
cum_weights = np.cumsum(weights)
cum_weighted_input = np.cumsum(signal * weights)
# cum_weighted_input / cum_weights
# but, avoid divide by zero errors
avg = np.divide(
cum_weighted_input,
cum_weights,
where=cum_weights != 0
)
return (
avg,
cum_weighted_input,
cum_weights,
)
async def _tina_vwap(
source, #: AsyncStream[np.ndarray],
ohlcv: np.ndarray, # price time-frame "aware"
anchors: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming volume weighted moving average.
Calling this "tina" for now since we're using HLC3 instead of tick.
"""
if anchors is None:
# TODO:
# anchor to session start of data if possible
pass
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
h_vwap, cum_wp, cum_v = wap(chl3, v)
# deliver historical output as "first yield"
yield h_vwap
w_tot = cum_wp[-1]
v_tot = cum_v[-1]
# vwap_tot = h_vwap[-1]
async for quote in source:
for tick in iterticks(quote, types=['trade']):
# c, h, l, v = ohlcv.array[-1][
# ['closes', 'high', 'low', 'volume']
# ]
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
v_tot += size
w_tot += price * size
# yield ((((o + h + l) / 3) * v) weights_tot) / v_tot
yield w_tot / v_tot

View File

@ -38,7 +38,7 @@ class Axis(pg.AxisItem):
def __init__( def __init__(
self, self,
linked_charts, linked_charts,
typical_max_str: str = '100 000.00', typical_max_str: str = '100 000.000',
min_tick: int = 2, min_tick: int = 2,
**kwargs **kwargs
) -> None: ) -> None:
@ -51,6 +51,8 @@ class Axis(pg.AxisItem):
self.setStyle(**{ self.setStyle(**{
'textFillLimits': [(0, 0.666)], 'textFillLimits': [(0, 0.666)],
'tickFont': _font.font, 'tickFont': _font.font,
# offset of text *away from* axis line in px
'tickTextOffset': 2,
}) })
self.setTickFont(_font.font) self.setTickFont(_font.font)
@ -88,11 +90,10 @@ class PriceAxis(Axis):
# print(f'digits: {digits}') # print(f'digits: {digits}')
return [ return [
('{value:,.{digits}f}') ('{value:,.{digits}f}').format(
.format( digits=digits,
digits=digits, value=v,
value=v, ).replace(',', ' ') for v in vals
).replace(',', ' ') for v in vals
] ]
@ -104,23 +105,36 @@ class DynamicDateAxis(Axis):
60: '%H:%M', 60: '%H:%M',
30: '%H:%M:%S', 30: '%H:%M:%S',
5: '%H:%M:%S', 5: '%H:%M:%S',
1: '%H:%M:%S',
} }
def resize(self) -> None: def resize(self) -> None:
self.setHeight(self.typical_br.height() + 3) self.setHeight(self.typical_br.height() + 1)
def _indexes_to_timestrs( def _indexes_to_timestrs(
self, self,
indexes: List[int], indexes: List[int],
) -> List[str]: ) -> List[str]:
bars = self.linked_charts.chart._array # try:
chart = self.linked_charts.chart
bars = chart._ohlc
shm = self.linked_charts.chart._shm
first = shm._first.value
bars_len = len(bars) bars_len = len(bars)
times = bars['time'] times = bars['time']
epochs = times[list( epochs = times[list(
map(int, filter(lambda i: i < bars_len, indexes)) map(
int,
filter(
lambda i: i > 0 and i < bars_len,
(i-first for i in indexes)
)
)
)] )]
# TODO: **don't** have this hard coded shift to EST # TODO: **don't** have this hard coded shift to EST
dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour() dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour()
@ -228,6 +242,7 @@ class AxisLabel(pg.GraphicsObject):
class XAxisLabel(AxisLabel): class XAxisLabel(AxisLabel):
_w_margin = 4
text_flags = ( text_flags = (
QtCore.Qt.TextDontClip QtCore.Qt.TextDontClip
@ -255,18 +270,17 @@ class XAxisLabel(AxisLabel):
w = self.boundingRect().width() w = self.boundingRect().width()
self.setPos(QPointF( self.setPos(QPointF(
abs_pos.x() - w / 2 - offset, abs_pos.x() - w / 2 - offset,
0, 1,
)) ))
self.update() self.update()
class YAxisLabel(AxisLabel): class YAxisLabel(AxisLabel):
_h_margin = 3 _h_margin = 2
# _w_margin = 1
text_flags = ( text_flags = (
# QtCore.Qt.AlignLeft QtCore.Qt.AlignLeft
QtCore.Qt.AlignHCenter # QtCore.Qt.AlignHCenter
| QtCore.Qt.AlignVCenter | QtCore.Qt.AlignVCenter
| QtCore.Qt.TextDontClip | QtCore.Qt.TextDontClip
) )
@ -283,13 +297,13 @@ class YAxisLabel(AxisLabel):
) -> None: ) -> None:
# this is read inside ``.paint()`` # this is read inside ``.paint()``
self.label_str = '{value:,.{digits}f}'.format( self.label_str = ' {value:,.{digits}f}'.format(
digits=self.digits, value=value).replace(',', ' ') digits=self.digits, value=value).replace(',', ' ')
br = self.boundingRect() br = self.boundingRect()
h = br.height() h = br.height()
self.setPos(QPointF( self.setPos(QPointF(
0, 1,
abs_pos.y() - h / 2 - offset abs_pos.y() - h / 2 - offset
)) ))
self.update() self.update()

File diff suppressed because it is too large Load Diff

View File

@ -20,12 +20,15 @@ Trio - Qt integration
Run ``trio`` in guest mode on top of the Qt event loop. Run ``trio`` in guest mode on top of the Qt event loop.
All global Qt runtime settings are mostly defined here. All global Qt runtime settings are mostly defined here.
""" """
import os
import signal
from functools import partial from functools import partial
import traceback import traceback
from typing import Tuple, Callable, Dict, Any from typing import Tuple, Callable, Dict, Any
# Qt specific # Qt specific
import PyQt5 # noqa import PyQt5 # noqa
import pyqtgraph as pg
from pyqtgraph import QtGui from pyqtgraph import QtGui
from PyQt5 import QtCore from PyQt5 import QtCore
from PyQt5.QtCore import ( from PyQt5.QtCore import (
@ -37,6 +40,12 @@ import tractor
from outcome import Error from outcome import Error
# pyqtgraph global config
# might as well enable this for now?
pg.useOpenGL = True
pg.enableExperimental = True
# singleton app per actor # singleton app per actor
_qt_app: QtGui.QApplication = None _qt_app: QtGui.QApplication = None
_qt_win: QtGui.QMainWindow = None _qt_win: QtGui.QMainWindow = None
@ -67,6 +76,17 @@ class MainWindow(QtGui.QMainWindow):
self.setMinimumSize(*self.size) self.setMinimumSize(*self.size)
self.setWindowTitle(self.title) self.setWindowTitle(self.title)
def closeEvent(
self,
event: 'QCloseEvent'
) -> None:
"""Cancel the root actor asap.
"""
# raising KBI seems to get intercepted by by Qt so just use the
# system.
os.kill(os.getpid(), signal.SIGINT)
def run_qtractor( def run_qtractor(
func: Callable, func: Callable,
@ -115,11 +135,15 @@ def run_qtractor(
def done_callback(outcome): def done_callback(outcome):
print(f"Outcome: {outcome}")
if isinstance(outcome, Error): if isinstance(outcome, Error):
exc = outcome.error exc = outcome.error
traceback.print_exception(type(exc), exc, exc.__traceback__)
if isinstance(outcome.error, KeyboardInterrupt):
# make it kinda look like ``trio``
print("Terminated!")
else:
traceback.print_exception(type(exc), exc, exc.__traceback__)
app.quit() app.quit()
@ -154,6 +178,7 @@ def run_qtractor(
main, main,
run_sync_soon_threadsafe=run_sync_soon_threadsafe, run_sync_soon_threadsafe=run_sync_soon_threadsafe,
done_callback=done_callback, done_callback=done_callback,
# restrict_keyboard_interrupt_to_checkpoints=True,
) )
window.main_widget = main_widget window.main_widget = main_widget

View File

@ -17,39 +17,44 @@
""" """
Chart graphics for displaying a slew of different data types. Chart graphics for displaying a slew of different data types.
""" """
import inspect
# import time
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
# from numba import jit, float64, optional, int64 from numba import jit, float64, int64 # , optional
# from numba import types as ntypes
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import QLineF, QPointF from PyQt5.QtCore import QLineF, QPointF
# from .._profile import timeit from .._profile import timeit
# from ..data._source import numba_ohlc_dtype
from ._style import ( from ._style import (
_xaxis_at, _xaxis_at,
hcolor, hcolor,
_font, _font,
_down_2_font_inches_we_like,
) )
from ._axes import YAxisLabel, XAxisLabel, YSticky from ._axes import YAxisLabel, XAxisLabel, YSticky
# XXX: these settings seem to result in really decent mouse scroll # XXX: these settings seem to result in really decent mouse scroll
# latency (in terms of perceived lag in cross hair) so really be sure # latency (in terms of perceived lag in cross hair) so really be sure
# there's an improvement if you want to change it. # there's an improvement if you want to change it!
_mouse_rate_limit = 60 # calc current screen refresh rate? _mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 2e3 _debounce_delay = 1 / 2e3
_ch_label_opac = 1 _ch_label_opac = 1
# TODO: we need to handle the case where index is outside
# the underlying datums range
class LineDot(pg.CurvePoint): class LineDot(pg.CurvePoint):
def __init__( def __init__(
self, self,
curve: pg.PlotCurveItem, curve: pg.PlotCurveItem,
index: int, index: int,
plot: 'ChartPlotWidget',
pos=None, pos=None,
size: int = 2, # in pxs size: int = 2, # in pxs
color: str = 'default_light', color: str = 'default_light',
@ -61,6 +66,7 @@ class LineDot(pg.CurvePoint):
pos=pos, pos=pos,
rotate=False, rotate=False,
) )
self._plot = plot
# TODO: get pen from curve if not defined? # TODO: get pen from curve if not defined?
cdefault = hcolor(color) cdefault = hcolor(color)
@ -80,6 +86,31 @@ class LineDot(pg.CurvePoint):
# keep a static size # keep a static size
self.setFlag(self.ItemIgnoresTransformations) self.setFlag(self.ItemIgnoresTransformations)
def event(
self,
ev: QtCore.QEvent,
) -> None:
# print((ev, type(ev)))
if not isinstance(ev, QtCore.QDynamicPropertyChangeEvent) or self.curve() is None:
return False
# if ev.propertyName() == 'index':
# print(ev)
# # self.setProperty
(x, y) = self.curve().getData()
index = self.property('index')
# first = self._plot._ohlc[0]['index']
# first = x[0]
# i = index - first
i = index - x[0]
if i > 0 and i < len(y):
newPos = (index, y[i])
QtGui.QGraphicsItem.setPos(self, *newPos)
return True
return False
_corner_anchors = { _corner_anchors = {
'top': 0, 'top': 0,
@ -91,8 +122,9 @@ _corner_anchors = {
_corner_margins = { _corner_margins = {
('top', 'left'): (-4, -5), ('top', 'left'): (-4, -5),
('top', 'right'): (4, -5), ('top', 'right'): (4, -5),
('bottom', 'left'): (-4, 5),
('bottom', 'right'): (4, 5), ('bottom', 'left'): (-4, lambda font_size: font_size * 2),
('bottom', 'right'): (4, lambda font_size: font_size * 2),
} }
@ -109,7 +141,10 @@ class ContentsLabel(pg.LabelItem):
font_size: Optional[int] = None, font_size: Optional[int] = None,
) -> None: ) -> None:
font_size = font_size or _font.font.pixelSize() font_size = font_size or _font.font.pixelSize()
super().__init__(justify=justify_text, size=f'{str(font_size)}px') super().__init__(
justify=justify_text,
size=f'{str(font_size)}px'
)
# anchor to viewbox # anchor to viewbox
self.setParentItem(chart._vb) self.setParentItem(chart._vb)
@ -120,6 +155,10 @@ class ContentsLabel(pg.LabelItem):
index = (_corner_anchors[h], _corner_anchors[v]) index = (_corner_anchors[h], _corner_anchors[v])
margins = _corner_margins[(v, h)] margins = _corner_margins[(v, h)]
ydim = margins[1]
if inspect.isfunction(margins[1]):
margins = margins[0], ydim(font_size)
self.anchor(itemPos=index, parentPos=index, offset=margins) self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc( def update_from_ohlc(
@ -129,15 +168,19 @@ class ContentsLabel(pg.LabelItem):
array: np.ndarray, array: np.ndarray,
) -> None: ) -> None:
# this being "html" is the dumbest shit :eyeroll: # this being "html" is the dumbest shit :eyeroll:
first = array[0]['index']
self.setText( self.setText(
"<b>i</b>:{index}<br/>" "<b>i</b>:{index}<br/>"
"<b>O</b>:{}<br/>" "<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>" "<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>" "<b>L</b>:{}<br/>"
"<b>C</b>:{}<br/>" "<b>C</b>:{}<br/>"
"<b>V</b>:{}".format( "<b>V</b>:{}<br/>"
# *self._array[index].item()[2:8], "<b>wap</b>:{}".format(
*array[index].item()[2:8], *array[index - first][
['open', 'high', 'low', 'close', 'volume', 'bar_wap']
],
name=name, name=name,
index=index, index=index,
) )
@ -149,8 +192,10 @@ class ContentsLabel(pg.LabelItem):
index: int, index: int,
array: np.ndarray, array: np.ndarray,
) -> None: ) -> None:
data = array[index][name] first = array[0]['index']
self.setText(f"{name}: {data:.2f}") if index < array[-1]['index'] and index > first:
data = array[index - first][name]
self.setText(f"{name}: {data:.2f}")
class CrossHair(pg.GraphicsObject): class CrossHair(pg.GraphicsObject):
@ -246,7 +291,7 @@ class CrossHair(pg.GraphicsObject):
) -> LineDot: ) -> LineDot:
# if this plot contains curves add line dot "cursors" to denote # if this plot contains curves add line dot "cursors" to denote
# the current sample under the mouse # the current sample under the mouse
cursor = LineDot(curve, index=len(plot._array)) cursor = LineDot(curve, index=plot._ohlc[-1]['index'], plot=plot)
plot.addItem(cursor) plot.addItem(cursor)
self.graphics[plot].setdefault('cursors', []).append(cursor) self.graphics[plot].setdefault('cursors', []).append(cursor)
return cursor return cursor
@ -308,8 +353,9 @@ class CrossHair(pg.GraphicsObject):
plot.update_contents_labels(ix) plot.update_contents_labels(ix)
# update all subscribed curve dots # update all subscribed curve dots
# first = plot._ohlc[0]['index']
for cursor in opts.get('cursors', ()): for cursor in opts.get('cursors', ()):
cursor.setIndex(ix) cursor.setIndex(ix) # - first)
# update the label on the bottom of the crosshair # update the label on the bottom of the crosshair
self.xaxis_label.update_label( self.xaxis_label.update_label(
@ -332,96 +378,127 @@ class CrossHair(pg.GraphicsObject):
return self.plots[0].boundingRect() return self.plots[0].boundingRect()
# @jit( def _mk_lines_array(
# # float64[:]( data: List,
# # float64[:], size: int,
# # optional(float64), elements_step: int = 6,
# # optional(int16) ) -> np.ndarray:
# # ), """Create an ndarray to hold lines graphics info.
# nopython=True,
# nogil=True
# )
def _mk_lines_array(data: List, size: int) -> np.ndarray:
"""Create an ndarray to hold lines graphics objects.
""" """
return np.zeros_like( return np.zeros_like(
data, data,
shape=(int(size), 3), shape=(int(size), elements_step),
dtype=object, dtype=object,
) )
# TODO: `numba` this? def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
open, high, low, close, index = row[
['open', 'high', 'low', 'close', 'index']]
# @jit( # high -> low vertical (body) line
# # float64[:]( if low != high:
# # float64[:], hl = QLineF(index, low, index, high)
# # optional(float64), else:
# # optional(int16) # XXX: if we don't do it renders a weird rectangle?
# # ), # see below for filtering this later...
# nopython=True, hl = None
# nogil=True
# ) # NOTE: place the x-coord start as "middle" of the drawing range such
def bars_from_ohlc( # that the open arm line-graphic is at the left-most-side of
# the index's range according to the view mapping.
# open line
o = QLineF(index - w, open, index, open)
# close line
c = QLineF(index, close, index + w, close)
return [hl, o, c]
@jit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.Tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nopython=True,
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray, data: np.ndarray,
w: float, start: int64,
start: int = 0, bar_gap: float64 = 0.43,
) -> np.ndarray: ) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data. """Generate an array of lines objects from input ohlc data.
""" """
lines = _mk_lines_array(data, data.shape[0]) size = int(data.shape[0] * 6)
for i, q in enumerate(data[start:], start=start): x = np.zeros(
open, high, low, close, index = q[ # data,
['open', 'high', 'low', 'close', 'index']] shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# high -> low vertical (body) line # TODO: report bug for assert @
if low != high: # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
hl = QLineF(index, low, index, high) for i, q in enumerate(data[start:], start):
else:
# XXX: if we don't do it renders a weird rectangle?
# see below for filtering this later...
hl = None
# NOTE: place the x-coord start as "middle" of the drawing range such # TODO: ask numba why this doesn't work..
# that the open arm line-graphic is at the left-most-side of # open, high, low, close, index = q[
# the index's range according to the view mapping. # ['open', 'high', 'low', 'close', 'index']]
# open line open = q['open']
o = QLineF(index - w, open, index, open) high = q['high']
# close line low = q['low']
c = QLineF(index, close, index + w, close) close = q['close']
index = float64(q['index'])
# indexing here is as per the below comments istart = i * 6
lines[i] = (hl, o, c) istop = istart + 6
# XXX: in theory we could get a further speedup by using a flat # x,y detail the 6 points which connect all vertexes of a ohlc bar
# array and avoiding the call to `np.ravel()` below? x[istart:istop] = (
# lines[3*i:3*i+3] = (hl, o, c) index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# XXX: legacy code from candles custom graphics: # specifies that the first edge is never connected to the
# if not _tina_mode: # prior bars last edge thus providing a small "gap"/"space"
# else _tina_mode: # between bars determined by ``bar_gap``.
# self.lines = lines = np.concatenate( c[istart:istop] = (0, 1, 1, 1, 1, 1)
# [high_to_low, open_sticks, close_sticks])
# use traditional up/down green/red coloring
# long_bars = np.resize(Quotes.close > Quotes.open, len(lines))
# short_bars = np.resize(
# Quotes.close < Quotes.open, len(lines))
# ups = lines[long_bars] return x, y, c
# downs = lines[short_bars]
# # draw "up" bars
# p.setPen(self.bull_brush)
# p.drawLines(*ups)
# # draw "down" bars # @timeit
# p.setPen(self.bear_brush) def gen_qpath(
# p.drawLines(*downs) data,
start, # XXX: do we need this?
w,
) -> QtGui.QPainterPath:
return lines x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
# TODO: numba the internals of this!
return pg.functions.arrayToQPath(x, y, connect=c)
class BarItems(pg.GraphicsObject): class BarItems(pg.GraphicsObject):
@ -431,11 +508,10 @@ class BarItems(pg.GraphicsObject):
# 0.5 is no overlap between arms, 1.0 is full overlap # 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43 w: float = 0.43
bars_pen = pg.mkPen(hcolor('bracket'))
# XXX: tina mode, see below # XXX: for the mega-lulz increasing width here increases draw latency...
# bull_brush = pg.mkPen('#00cc00') # so probably don't do it until we figure that out.
# bear_brush = pg.mkPen('#fa0000') bars_pen = pg.mkPen(hcolor('bracket'))
def __init__( def __init__(
self, self,
@ -443,95 +519,87 @@ class BarItems(pg.GraphicsObject):
plotitem: 'pg.PlotItem', # noqa plotitem: 'pg.PlotItem', # noqa
) -> None: ) -> None:
super().__init__() super().__init__()
self.last = QtGui.QPicture()
self.history = QtGui.QPicture() self.last_bar = QtGui.QPicture()
# TODO: implement updateable pixmap solution
self.path = QtGui.QPainterPath()
# self._h_path = QtGui.QGraphicsPathItem(self.path)
self._pi = plotitem self._pi = plotitem
# self._scene = plotitem.vb.scene()
# self.picture = QtGui.QPixmap(1000, 300) self._xrange: Tuple[int, int]
# plotitem.addItem(self.picture) self._yrange: Tuple[float, float]
# self._pmi = None
# self._pmi = self._scene.addPixmap(self.picture)
# XXX: not sure this actually needs to be an array other # XXX: not sure this actually needs to be an array other
# then for the old tina mode calcs for up/down bars below? # then for the old tina mode calcs for up/down bars below?
# lines container # lines container
self.lines = _mk_lines_array([], 50e3) # self.lines = _mk_lines_array([], 50e3, 6)
# TODO: don't render the full backing array each time
# self._path_data = None
self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None
# track the current length of drawable lines within the larger array # track the current length of drawable lines within the larger array
self.index: int = 0 self.start_index: int = 0
self.stop_index: int = 0
# @timeit # @timeit
def draw_from_data( def draw_from_data(
self, self,
data: np.ndarray, data: np.ndarray,
start: int = 0, start: int = 0,
): ) -> QtGui.QPainterPath:
"""Draw OHLC datum graphics from a ``np.ndarray``. """Draw OHLC datum graphics from a ``np.ndarray``.
This routine is usually only called to draw the initial history. This routine is usually only called to draw the initial history.
""" """
lines = bars_from_ohlc(data, self.w, start=start) self.path = gen_qpath(data, start, self.w)
# save graphics for later reference and keep track # save graphics for later reference and keep track
# of current internal "last index" # of current internal "last index"
index = len(lines) # self.start_index = len(data)
self.lines[:index] = lines index = data['index']
self.index = index self._xrange = (index[0], index[-1])
self._yrange = (
np.nanmax(data['high']),
np.nanmin(data['low']),
)
# up to last to avoid double draw of last bar # up to last to avoid double draw of last bar
self.draw_lines(just_history=True, iend=self.index - 1) self._last_bar_lines = lines_from_ohlc(data[-1], self.w)
self.draw_lines(iend=self.index)
# @timeit # create pics
def draw_lines( # self.draw_history()
self, self.draw_last_bar()
istart=0,
iend=None,
just_history=False,
# TODO: could get even fancier and only update the single close line?
lines=None,
) -> None:
"""Draw the current line set using the painter.
"""
if just_history:
# draw bars for the "history" picture
iend = iend or self.index - 1
pic = self.history
else:
# draw the last bar
istart = self.index - 1
iend = iend or self.index
pic = self.last
# use 2d array of lines objects, see conlusion on speed: # trigger render
# https://stackoverflow.com/a/60089929
flat = np.ravel(self.lines[istart:iend])
# TODO: do this with numba for speed gain:
# https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach
to_draw = flat[np.where(flat != None)] # noqa
# pre-computing a QPicture object allows paint() to run much
# more quickly, rather than re-drawing the shapes every time.
p = QtGui.QPainter(pic)
p.setPen(self.bars_pen)
# TODO: is there any way to not have to pass all the lines every
# iteration? It seems they won't draw unless it's done this way..
p.drawLines(*to_draw)
p.end()
# XXX: if we ever try using `QPixmap` again...
# if self._pmi is None:
# self._pmi = self.scene().addPixmap(self.picture)
# else:
# self._pmi.setPixmap(self.picture)
# trigger re-render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update # https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update() self.update()
return self.path
# def update_ranges(
# self,
# xmn: int,
# xmx: int,
# ymn: float,
# ymx: float,
# ) -> None:
# ...
def draw_last_bar(self) -> None:
"""Currently this draws lines to a cached ``QPicture`` which
is supposed to speed things up on ``.paint()`` calls (which
is a call to ``QPainter.drawPicture()`` but I'm not so sure.
"""
p = QtGui.QPainter(self.last_bar)
p.setPen(self.bars_pen)
p.drawLines(*tuple(filter(bool, self._last_bar_lines)))
p.end()
# @timeit
def update_from_array( def update_from_array(
self, self,
array: np.ndarray, array: np.ndarray,
@ -545,32 +613,65 @@ class BarItems(pg.GraphicsObject):
graphics object, and then update/rerender, but here we're graphics object, and then update/rerender, but here we're
assuming the prior graphics havent changed (OHLC history rarely assuming the prior graphics havent changed (OHLC history rarely
does) so this "should" be simpler and faster. does) so this "should" be simpler and faster.
This routine should be made (transitively) as fast as possible.
""" """
index = self.index # index = self.start_index
length = len(array) istart, istop = self._xrange
extra = length - index
# start_bar_to_update = index - 100 index = array['index']
first_index, last_index = index[0], index[-1]
# length = len(array)
prepend_length = istart - first_index
append_length = last_index - istop
# TODO: allow mapping only a range of lines thus
# only drawing as many bars as exactly specified.
if prepend_length:
# new history was added and we need to render a new path
new_bars = array[:prepend_length]
prepend_path = gen_qpath(new_bars, 0, self.w)
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from
# array[prepend_length + 1] ???
# update path
old_path = self.path
self.path = prepend_path
self.path.addPath(old_path)
if append_length:
# generate new lines objects for updatable "current bar"
self._last_bar_lines = lines_from_ohlc(array[-1], self.w)
self.draw_last_bar()
if extra > 0:
# generate new graphics to match provided array # generate new graphics to match provided array
new = array[index:index + extra] # path appending logic:
lines = bars_from_ohlc(new, self.w) # we need to get the previous "current bar(s)" for the time step
bars_added = len(lines) # and convert it to a sub-path to append to the historical set
self.lines[index:index + bars_added] = lines # new_bars = array[istop - 1:istop + append_length - 1]
self.index += bars_added new_bars = array[-append_length - 1:-1]
append_path = gen_qpath(new_bars, 0, self.w)
self.path.moveTo(float(istop - self.w), float(new_bars[0]['open']))
self.path.addPath(append_path)
# start_bar_to_update = index - bars_added self._xrange = first_index, last_index
self.draw_lines(just_history=True)
if just_history:
return
# current bar update if just_history:
self.update()
return
# last bar update
i, o, h, l, last, v = array[-1][ i, o, h, l, last, v = array[-1][
['index', 'open', 'high', 'low', 'close', 'volume'] ['index', 'open', 'high', 'low', 'close', 'volume']
] ]
assert i == self.index - 1 # assert i == self.start_index - 1
body, larm, rarm = self.lines[i] assert i == last_index
body, larm, rarm = self._last_bar_lines
# XXX: is there a faster way to modify this? # XXX: is there a faster way to modify this?
rarm.setLine(rarm.x1(), last, rarm.x2(), last) rarm.setLine(rarm.x1(), last, rarm.x2(), last)
@ -579,16 +680,26 @@ class BarItems(pg.GraphicsObject):
if l != h: # noqa if l != h: # noqa
if body is None: if body is None:
body = self.lines[index - 1][0] = QLineF(i, l, i, h) body = self._last_bar_lines[0] = QLineF(i, l, i, h)
else: else:
# update body # update body
body.setLine(i, l, i, h) body.setLine(i, l, i, h)
else:
# XXX: h == l -> remove any HL line to avoid render bug
if body is not None:
body = self.lines[index - 1][0] = None
self.draw_lines(just_history=False) # XXX: pretty sure this is causing an issue where the bar has
# a large upward move right before the next sample and the body
# is getting set to None since the next bar is flat but the shm
# array index update wasn't read by the time this code runs. Iow
# we're doing this removal of the body for a bar index that is
# now out of date / from some previous sample. It's weird
# though because i've seen it do this to bars i - 3 back?
# else:
# # XXX: h == l -> remove any HL line to avoid render bug
# if body is not None:
# body = self.lines[index - 1][0] = None
self.draw_last_bar()
self.update()
# @timeit # @timeit
def paint(self, p, opt, widget): def paint(self, p, opt, widget):
@ -606,33 +717,36 @@ class BarItems(pg.GraphicsObject):
# as is necesarry for what's in "view". Not sure if this will # as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars # lead to any perf gains other then when zoomed in to less bars
# in view. # in view.
p.drawPicture(0, 0, self.history) p.drawPicture(0, 0, self.last_bar)
p.drawPicture(0, 0, self.last)
# TODO: if we can ever make pixmaps work... p.setPen(self.bars_pen)
# p.drawPixmap(0, 0, self.picture) p.drawPath(self.path)
# self._pmi.setPixmap(self.picture)
# print(self.scene())
# profiler('bars redraw:')
# @timeit
def boundingRect(self): def boundingRect(self):
# TODO: can we do rect caching to make this faster?
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
# TODO: Can we do rect caching to make this faster
# like `pg.PlotCurveItem` does? In theory it's just
# computing max/min stuff again like we do in the udpate loop
# anyway. Not really sure it's necessary since profiling already
# shows this method is faf.
# boundingRect _must_ indicate the entire area that will be # boundingRect _must_ indicate the entire area that will be
# drawn on or else we will get artifacts and possibly crashing. # drawn on or else we will get artifacts and possibly crashing.
# (in this case, QPicture does all the work of computing the # (in this case, QPicture does all the work of computing the
# bounding rect for us). # bounding rect for us).
# compute aggregate bounding rectangle # compute aggregate bounding rectangle
lb = self.last.boundingRect() lb = self.last_bar.boundingRect()
hb = self.history.boundingRect() hb = self.path.boundingRect()
return QtCore.QRectF( return QtCore.QRectF(
# top left # top left
QtCore.QPointF(hb.topLeft()), QtCore.QPointF(hb.topLeft()),
# total size # total size
QtCore.QSizeF(lb.size() + hb.size()) QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size())
# QtCore.QSizeF(lb.size() + hb.size())
) )
@ -785,7 +899,7 @@ class L1Labels:
chart: 'ChartPlotWidget', # noqa chart: 'ChartPlotWidget', # noqa
digits: int = 2, digits: int = 2,
size_digits: int = 0, size_digits: int = 0,
font_size_inches: float = 4 / 53., font_size_inches: float = _down_2_font_inches_we_like,
) -> None: ) -> None:
self.chart = chart self.chart = chart
@ -839,7 +953,9 @@ def level_line(
digits: int = 1, digits: int = 1,
# size 4 font on 4k screen scaled down, so small-ish. # size 4 font on 4k screen scaled down, so small-ish.
font_size_inches: float = 4 / 53., font_size_inches: float = _down_2_font_inches_we_like,
show_label: bool = True,
**linelabelkwargs **linelabelkwargs
) -> LevelLine: ) -> LevelLine:
@ -859,6 +975,7 @@ def level_line(
**linelabelkwargs **linelabelkwargs
) )
label.update_from_data(0, level) label.update_from_data(0, level)
# TODO: can we somehow figure out a max value from the parent axis? # TODO: can we somehow figure out a max value from the parent axis?
label._size_br_from_str(label.label_str) label._size_br_from_str(label.label_str)
@ -874,4 +991,7 @@ def level_line(
chart.plotItem.addItem(line) chart.plotItem.addItem(line)
if not show_label:
label.hide()
return line return line

View File

@ -245,7 +245,7 @@ class ChartView(ViewBox):
log.debug("Max zoom bruh...") log.debug("Max zoom bruh...")
return return
if ev.delta() < 0 and vl >= len(self.linked_charts._array) + 666: if ev.delta() < 0 and vl >= len(self.linked_charts.chart._ohlc) + 666:
log.debug("Min zoom bruh...") log.debug("Min zoom bruh...")
return return
@ -268,9 +268,9 @@ class ChartView(ViewBox):
# ).map(furthest_right_coord) # ).map(furthest_right_coord)
# ) # )
# This seems like the most "intuitive option, a hybrdid of # This seems like the most "intuitive option, a hybrid of
# tws and tv styles # tws and tv styles
last_bar = pg.Point(rbar) last_bar = pg.Point(int(rbar))
self._resetTarget() self._resetTarget()
self.scaleBy(s, last_bar) self.scaleBy(s, last_bar)

View File

@ -18,6 +18,7 @@
Qt UI styling. Qt UI styling.
""" """
from typing import Optional from typing import Optional
import math
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
@ -27,10 +28,9 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
# chart-wide font # chart-wide fonts specified in inches
# font size 6px / 53 dpi (3x scaled down on 4k hidpi) _default_font_inches_we_like = 6 / 96
_default_font_inches_we_like = 6 / 53 # px / (px / inch) = inch _down_2_font_inches_we_like = 5 / 96
_down_2_font_inches_we_like = 4 / 53
class DpiAwareFont: class DpiAwareFont:
@ -66,8 +66,12 @@ class DpiAwareFont:
listed in the script in ``snippets/qt_screen_info.py``. listed in the script in ``snippets/qt_screen_info.py``.
""" """
dpi = screen.physicalDotsPerInch() # take the max since scaling can make things ugly in some cases
font_size = round(self._iwl * dpi) pdpi = screen.physicalDotsPerInch()
ldpi = screen.logicalDotsPerInch()
dpi = max(pdpi, ldpi)
font_size = math.floor(self._iwl * dpi)
log.info( log.info(
f"\nscreen:{screen.name()} with DPI: {dpi}" f"\nscreen:{screen.name()} with DPI: {dpi}"
f"\nbest font size is {font_size}\n" f"\nbest font size is {font_size}\n"