Merge pull request #130 from pikers/to_qpainterpath_and_beyond

To qpainterpath and beyond
kraken_history
goodboy 2020-12-21 13:01:27 -05:00 committed by GitHub
commit c5b2b6918b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1166 additions and 536 deletions

View File

@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``. ``infected_aio==True``.
""" """
from contextlib import asynccontextmanager, contextmanager from contextlib import asynccontextmanager
from dataclasses import asdict from dataclasses import asdict
from functools import partial from functools import partial
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio import asyncio
import logging import logging
@ -32,6 +33,7 @@ import itertools
import time import time
from async_generator import aclosing from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker from ib_insync.ticker import Ticker
import ib_insync as ibis import ib_insync as ibis
@ -45,7 +47,7 @@ from ..data import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
iterticks, iterticks,
attach_shm_array, attach_shm_array,
get_shm_token, # get_shm_token,
subscribe_ohlc_for_increment, subscribe_ohlc_for_increment,
) )
from ..data._source import from_df from ..data._source import from_df
@ -86,6 +88,8 @@ _time_frames = {
'Y': 'OneYear', 'Y': 'OneYear',
} }
_show_wap_in_history = False
# overrides to sidestep pretty questionable design decisions in # overrides to sidestep pretty questionable design decisions in
# ``ib_insync``: # ``ib_insync``:
@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
} }
_enters = 0
class Client: class Client:
"""IB wrapped for our broker backend API. """IB wrapped for our broker backend API.
@ -142,32 +148,54 @@ class Client:
self.ib = ib self.ib = ib
self.ib.RaiseRequestErrors = True self.ib.RaiseRequestErrors = True
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def bars( async def bars(
self, self,
symbol: str, symbol: str,
# EST in ISO 8601 format is required... below is EPOCH # EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00", start_dt: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m', end_dt: str = "",
count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False, sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present. """Retreive OHLCV bars for a symbol over a range to the present.
""" """
bars_kwargs = {'whatToShow': 'TRADES'} bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters}')
_enters += 1
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count) # _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync( bars = await self.ib.reqHistoricalDataAsync(
contract, contract,
endDateTime='', endDateTime=end_dt,
# durationStr='60 S',
# durationStr='1 D', # time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',
# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count),
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# time length calcs
durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs',
# always use extended hours # always use extended hours
useRTH=False, useRTH=False,
@ -181,9 +209,13 @@ class Client:
# TODO: raise underlying error here # TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?") raise ValueError(f"No bars retreived for {symbol}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe: # convert to pandas dataframe:
df = ibis.util.df(bars) df = ibis.util.df(bars)
return from_df(df) return bars, from_df(df)
def onError(self, reqId, errorCode, errorString, contract) -> None:
breakpoint()
async def search_stocks( async def search_stocks(
self, self,
@ -237,6 +269,8 @@ class Client:
"""Get an unqualifed contract for the current "continous" future. """Get an unqualifed contract for the current "continous" future.
""" """
contcon = ibis.ContFuture(symbol, exchange=exchange) contcon = ibis.ContFuture(symbol, exchange=exchange)
# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId) return ibis.Future(conId=frontcon.conId)
@ -279,10 +313,10 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD' currency = 'CAD'
if exch in ('PURE',): if exch in ('PURE', 'TSE'):
# stupid ib... # stupid ib...
primaryExchange = exch
exch = 'SMART' exch = 'SMART'
primaryExchange = 'PURE'
con = ibis.Stock( con = ibis.Stock(
symbol=sym, symbol=sym,
@ -293,10 +327,27 @@ class Client:
try: try:
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0] contract = (await self.ib.qualifyContractsAsync(con))[0]
head = await self.get_head_time(contract)
print(head)
except IndexError: except IndexError:
raise ValueError(f"No contract could be found {con}") raise ValueError(f"No contract could be found {con}")
return contract return contract
async def get_head_time(
self,
contract: Contract,
) -> datetime:
"""Return the first datetime stamp for ``contract``.
"""
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
useRTH=False,
formatDate=2, # timezone aware UTC datetime
)
async def stream_ticker( async def stream_ticker(
self, self,
symbol: str, symbol: str,
@ -309,7 +360,13 @@ class Client:
contract = await self.find_contract(symbol) contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
def push(t): def push(t):
"""Push quotes to trio task.
"""
# log.debug(t) # log.debug(t)
try: try:
to_trio.send_nowait(t) to_trio.send_nowait(t)
@ -346,9 +403,17 @@ async def _aio_get_client(
""" """
# first check cache for existing client # first check cache for existing client
# breakpoint()
try: try:
yield _client_cache[(host, port)] if port:
except KeyError: client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record # TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one. # of existing brokerd we need to broadcast for one.
@ -359,9 +424,11 @@ async def _aio_get_client(
ib = NonShittyIB() ib = NonShittyIB()
ports = _try_ports if port is None else [port] ports = _try_ports if port is None else [port]
_err = None _err = None
for port in ports: for port in ports:
try: try:
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id) await ib.connectAsync(host, port, clientId=client_id)
break break
except ConnectionRefusedError as ce: except ConnectionRefusedError as ce:
@ -373,6 +440,7 @@ async def _aio_get_client(
try: try:
client = Client(ib) client = Client(ib)
_client_cache[(host, port)] = client _client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client yield client
except BaseException: except BaseException:
ib.disconnect() ib.disconnect()
@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None, from_trio=None,
**kwargs, **kwargs,
) -> None: ) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client: async with _aio_get_client() as client:
async_meth = getattr(client, meth) async_meth = getattr(client, meth)
@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str, method: str,
**kwargs, **kwargs,
) -> None: ) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.
"""
ca = tractor.current_actor() ca = tractor.current_actor()
assert ca.is_infected_aio() assert ca.is_infected_aio()
@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {} _local_buffer_writers = {}
@contextmanager @asynccontextmanager
def activate_writer(key: str): async def activate_writer(key: str) -> (bool, trio.Nursery):
try: try:
writer_already_exists = _local_buffer_writers.get(key, False) writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists: if not writer_already_exists:
_local_buffer_writers[key] = True _local_buffer_writers[key] = True
yield writer_already_exists async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally: finally:
_local_buffer_writers.pop(key, None) _local_buffer_writers.pop(key, None)
async def fill_bars(
first_bars,
shm,
count: int = 21,
) -> None:
"""Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
"""
next_dt = first_bars[0].date
i = 0
while i < count:
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol='.'.join(
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt,
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date
except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error~~
if err.code == 162:
log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS")
await tractor.breakpoint()
# TODO: figure out how to share quote feeds sanely despite # TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api. # the wacky ``ib_insync`` api.
# @tractor.msg.pub # @tractor.msg.pub
@ -575,7 +687,9 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task, # check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing # otherwise start one and mark it as now existing
with activate_writer(shm_token['shm_name']) as writer_already_exists: async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem # maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous # check if shm has already been created by previous
@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer # we are the buffer writer
readonly=False, readonly=False,
) )
bars = await _trio_run_client_method(
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars', method='bars',
symbol=sym, symbol=sym,
) )
if bars is None: log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym) raise SymbolNotFound(sym)
# write historical data to buffer # write historical data to buffer
shm.push(bars) shm.push(bars_array)
shm_token = shm.token shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, bars, shm)
times = shm.array['time'] times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s) subscribe_ohlc_for_increment(shm, delay_s)
@ -656,6 +781,7 @@ async def stream_quotes(
# real-time stream # real-time stream
async for ticker in stream: async for ticker in stream:
# print(ticker.vwap)
quote = normalize( quote = normalize(
ticker, ticker,
calc_price=calc_price calc_price=calc_price
@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)): for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price'] last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry # update last entry
# benchmarked in the 4-5 us range # benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][ o, high, low, v = shm.array[-1][
@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price # is also the close/last trade price
o = last o = last
shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = ( shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o, o,
max(high, last), max(high, last),
min(low, last), min(low, last),

View File

@ -57,13 +57,15 @@ _ohlc_dtype = [
('close', float), ('close', float),
('volume', float), ('volume', float),
('count', int), ('count', int),
('vwap', float), ('bar_wap', float),
] ]
# UI components allow this to be declared such that additional # UI components allow this to be declared such that additional
# (historical) fields can be exposed. # (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype) ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True
class Client: class Client:
@ -341,7 +343,7 @@ async def stream_quotes(
while True: while True:
try: try:
async with trio_websocket.open_websocket_url( async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com', 'wss://ws.kraken.com/',
) as ws: ) as ws:
# XXX: setup subs # XXX: setup subs
@ -433,7 +435,7 @@ async def stream_quotes(
'high', 'high',
'low', 'low',
'close', 'close',
'vwap', 'bar_wap', # in this case vwap of bar
'volume'] 'volume']
][-1] = ( ][-1] = (
o, o,

View File

@ -42,7 +42,7 @@ from ._sharedmem import (
ShmArray, ShmArray,
get_shm_token, get_shm_token,
) )
from ._source import base_ohlc_dtype from ._source import base_iohlc_dtype
from ._buffer import ( from ._buffer import (
increment_ohlc_buffer, increment_ohlc_buffer,
subscribe_ohlc_for_increment subscribe_ohlc_for_increment
@ -139,6 +139,7 @@ class Feed:
name: str name: str
stream: AsyncIterator[Dict[str, Any]] stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray shm: ShmArray
# ticks: ShmArray
_broker_portal: tractor._portal.Portal _broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -188,7 +189,7 @@ async def open_feed(
key=sym_to_shm_key(name, symbols[0]), key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=True, readonly=True,

View File

@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
# append new entry to buffer thus "incrementing" the bar # append new entry to buffer thus "incrementing" the bar
array = shm.array array = shm.array
last = array[-1:].copy() last = array[-1:][shm._write_fields].copy()
(index, t, close) = last[0][['index', 'time', 'close']] # (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum # this copies non-std fields (eg. vwap) from the last datum
last[ last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close'] ['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close) ][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer # write to the buffer
shm.push(last) shm.push(last)
# broadcast the buffer index step # broadcast the buffer index step
yield {'index': shm._i.value} yield {'index': shm._last.value}
def subscribe_ohlc_for_increment( def subscribe_ohlc_for_increment(

View File

@ -33,6 +33,6 @@ def iterticks(
ticks = quote.get('ticks', ()) ticks = quote.get('ticks', ())
if ticks: if ticks:
for tick in ticks: for tick in ticks:
print(f"{quote['symbol']}: {tick}") # print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types: if tick.get('type') in types:
yield tick yield tick

View File

@ -17,11 +17,10 @@
""" """
NumPy compatible shared memory buffers for real-time FSP. NumPy compatible shared memory buffers for real-time FSP.
""" """
from typing import List
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from sys import byteorder from sys import byteorder
from typing import Tuple, Optional from typing import List, Tuple, Optional
from multiprocessing import shared_memory from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink from _posixshmem import shm_unlink
@ -29,7 +28,7 @@ import tractor
import numpy as np import numpy as np
from ..log import get_logger from ..log import get_logger
from ._source import base_ohlc_dtype from ._source import base_ohlc_dtype, base_iohlc_dtype
log = get_logger(__name__) log = get_logger(__name__)
@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd
class SharedInt: class SharedInt:
"""Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
"""
def __init__( def __init__(
self, self,
token: str, shm: SharedMemory,
create: bool = False,
) -> None: ) -> None:
# create a single entry array for storing an index counter self._shm = shm
self._shm = shared_memory.SharedMemory(
name=token,
create=create,
size=4, # std int
)
@property @property
def value(self) -> int: def value(self) -> int:
@ -79,7 +76,7 @@ class SharedInt:
self._shm.buf[:] = value.to_bytes(4, byteorder) self._shm.buf[:] = value.to_bytes(4, byteorder)
def destroy(self) -> None: def destroy(self) -> None:
if shared_memory._USE_POSIX: if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker" # We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems. # nonsense meant for non-SC systems.
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
@ -91,7 +88,8 @@ class _Token:
which can be used to key a system wide post shm entry. which can be used to key a system wide post shm entry.
""" """
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
shm_counter_name: str shm_first_index_name: str
shm_last_index_name: str
dtype_descr: List[Tuple[str]] dtype_descr: List[Tuple[str]]
def __post_init__(self): def __post_init__(self):
@ -130,27 +128,47 @@ def _make_token(
"""Create a serializable token that can be used """Create a serializable token that can be used
to access a shared array. to access a shared array.
""" """
dtype = base_ohlc_dtype if dtype is None else dtype dtype = base_iohlc_dtype if dtype is None else dtype
return _Token( return _Token(
key, key,
key + "_counter", key + "_first",
key + "_last",
np.dtype(dtype).descr np.dtype(dtype).descr
) )
class ShmArray: class ShmArray:
"""A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
"""
def __init__( def __init__(
self, self,
shmarr: np.ndarray, shmarr: np.ndarray,
counter: SharedInt, first: SharedInt,
shm: shared_memory.SharedMemory, last: SharedInt,
readonly: bool = True, shm: SharedMemory,
# readonly: bool = True,
) -> None: ) -> None:
self._array = shmarr self._array = shmarr
self._i = counter
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr) self._len = len(shmarr)
self._shm = shm self._shm = shm
self._readonly = readonly
# pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api? # TODO: ringbuf api?
@ -158,24 +176,25 @@ class ShmArray:
def _token(self) -> _Token: def _token(self) -> _Token:
return _Token( return _Token(
self._shm.name, self._shm.name,
self._i._shm.name, self._first._shm.name,
self._last._shm.name,
self._array.dtype.descr, self._array.dtype.descr,
) )
@property @property
def token(self) -> dict: def token(self) -> dict:
"""Shared memory token that can be serialized """Shared memory token that can be serialized and used by
and used by another process to attach to this array. another process to attach to this array.
""" """
return self._token.as_msg() return self._token.as_msg()
@property @property
def index(self) -> int: def index(self) -> int:
return self._i.value % self._len return self._last.value % self._len
@property @property
def array(self) -> np.ndarray: def array(self) -> np.ndarray:
return self._array[:self._i.value] return self._array[self._first.value:self._last.value]
def last( def last(
self, self,
@ -186,38 +205,90 @@ class ShmArray:
def push( def push(
self, self,
data: np.ndarray, data: np.ndarray,
prepend: bool = False,
) -> int: ) -> int:
"""Ring buffer like "push" to append data """Ring buffer like "push" to append data
into the buffer and return updated index. into the buffer and return updated "last" index.
""" """
length = len(data) length = len(data)
# TODO: use .index for actual ring logic?
index = self._i.value if prepend:
index = self._first.value - length
else:
index = self._last.value
end = index + length end = index + length
self._array[index:end] = data[:]
self._i.value = end fields = self._write_fields
try:
self._array[fields][index:end] = data[fields][:]
if prepend:
self._first.value = index
else:
self._last.value = end
return end return end
except ValueError as err:
# shoudl raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None: def close(self) -> None:
self._i._shm.close() self._first._shm.close()
self._last._shm.close()
self._shm.close() self._shm.close()
def destroy(self) -> None: def destroy(self) -> None:
if shared_memory._USE_POSIX: if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker" # We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems. # nonsense meant for non-SC systems.
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
self._i.destroy()
self._first.destroy()
self._last.destroy()
def flush(self) -> None: def flush(self) -> None:
# TODO: flush to storage backend like markestore? # TODO: flush to storage backend like markestore?
... ...
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 12)
_default_size = 2 * _secs_in_day
def open_shm_array( def open_shm_array(
key: Optional[str] = None, key: Optional[str] = None,
# approx number of 5s bars in a "day" x2 size: int = _default_size,
size: int = int(2*60*60*10/5),
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
readonly: bool = False, readonly: bool = False,
) -> ShmArray: ) -> ShmArray:
@ -229,7 +300,9 @@ def open_shm_array(
# create new shared mem segment for which we # create new shared mem segment for which we
# have write permission # have write permission
a = np.zeros(size, dtype=dtype) a = np.zeros(size, dtype=dtype)
shm = shared_memory.SharedMemory( a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key, name=key,
create=True, create=True,
size=a.nbytes size=a.nbytes
@ -243,17 +316,30 @@ def open_shm_array(
dtype=dtype dtype=dtype
) )
counter = SharedInt( # create single entry arrays for storing an first and last indices
token=token.shm_counter_name, first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True, create=True,
size=4, # std int
) )
counter.value = 0 )
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
last.value = first.value = int(_secs_in_day)
shmarr = ShmArray( shmarr = ShmArray(
array, array,
counter, first,
last,
shm, shm,
readonly=readonly,
) )
assert shmarr._token == token assert shmarr._token == token
@ -261,26 +347,31 @@ def open_shm_array(
# "unlink" created shm on process teardown by # "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack # pushing teardown calls onto actor context stack
actor = tractor.current_actor() tractor._actor._lifetime_stack.callback(shmarr.close)
actor._lifetime_stack.callback(shmarr.close) tractor._actor._lifetime_stack.callback(shmarr.destroy)
actor._lifetime_stack.callback(shmarr.destroy)
return shmarr return shmarr
def attach_shm_array( def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]], token: Tuple[str, str, Tuple[str, str]],
size: int = int(60*60*10/5), size: int = _default_size,
readonly: bool = True, readonly: bool = True,
) -> ShmArray: ) -> ShmArray:
"""Load and attach to an existing shared memory array previously """Attach to an existing shared memory array previously
created by another process using ``open_shared_array``. created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
""" """
token = _Token.from_msg(token) token = _Token.from_msg(token)
key = token.shm_name key = token.shm_name
if key in _known_tokens: if key in _known_tokens:
assert _known_tokens[key] == token, "WTF" assert _known_tokens[key] == token, "WTF"
shm = shared_memory.SharedMemory(name=key) # attach to array buffer and view as per dtype
shm = SharedMemory(name=key)
shmarr = np.ndarray( shmarr = np.ndarray(
(size,), (size,),
dtype=token.dtype_descr, dtype=token.dtype_descr,
@ -288,15 +379,29 @@ def attach_shm_array(
) )
shmarr.setflags(write=int(not readonly)) shmarr.setflags(write=int(not readonly))
counter = SharedInt(token=token.shm_counter_name) first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read # make sure we can read
counter.value first.value
sha = ShmArray( sha = ShmArray(
shmarr, shmarr,
counter, first,
last,
shm, shm,
readonly=readonly,
) )
# read test # read test
sha.array sha.array
@ -308,8 +413,8 @@ def attach_shm_array(
_known_tokens[key] = token _known_tokens[key] = token
# "close" attached shm on process teardown # "close" attached shm on process teardown
actor = tractor.current_actor() tractor._actor._lifetime_stack.callback(sha.close)
actor._lifetime_stack.callback(sha.close)
return sha return sha
@ -318,21 +423,20 @@ def maybe_open_shm_array(
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
**kwargs, **kwargs,
) -> Tuple[ShmArray, bool]: ) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block by a """Attempt to attach to a shared memory block using a "key" lookup
"key" determined by the users overall "system" to registered blocks in the users overall "system" registryt
(presumes you don't have the block's explicit token). (presumes you don't have the block's explicit token).
This function is meant to solve the problem of This function is meant to solve the problem of discovering whether
discovering whether a shared array token has been a shared array token has been allocated or discovered by the actor
allocated or discovered by the actor running in running in **this** process. Systems where multiple actors may seek
**this** process. Systems where multiple actors to access a common block can use this function to attempt to acquire
may seek to access a common block can use this a token as discovered by the actors who have previously stored
function to attempt to acquire a token as discovered a "key" -> ``_Token`` map in an actor local (aka python global)
by the actors who have previously stored a variable.
"key" -> ``_Token`` map in an actor local variable.
If you know the explicit ``_Token`` for your memory If you know the explicit ``_Token`` for your memory segment instead
instead use ``attach_shm_array``. use ``attach_shm_array``.
""" """
try: try:
# see if we already know this key # see if we already know this key

View File

@ -15,27 +15,36 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" """
Numpy data source machinery. numpy data source coversion helpers.
""" """
import decimal import decimal
from dataclasses import dataclass from dataclasses import dataclass
import numpy as np import numpy as np
import pandas as pd import pandas as pd
# from numba import from_dtype
# our minimum structured array layout for ohlc data ohlc_fields = [
base_ohlc_dtype = np.dtype(
[
('index', int),
('time', float), ('time', float),
('open', float), ('open', float),
('high', float), ('high', float),
('low', float), ('low', float),
('close', float), ('close', float),
('volume', int), ('volume', int),
('bar_wap', float),
] ]
)
ohlc_with_index = ohlc_fields.copy()
ohlc_with_index.insert(0, ('index', int))
# our minimum structured array layout for ohlc data
base_iohlc_dtype = np.dtype(ohlc_with_index)
base_ohlc_dtype = np.dtype(ohlc_fields)
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values # map time frame "keys" to minutes values
tf_in_1m = { tf_in_1m = {
@ -110,18 +119,27 @@ def from_df(
'Low': 'low', 'Low': 'low',
'Close': 'close', 'Close': 'close',
'Volume': 'volume', 'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
} }
df = df.rename(columns=columns) df = df.rename(columns=columns)
for name in df.columns: for name in df.columns:
if name not in base_ohlc_dtype.names[1:]: # if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name] del df[name]
# TODO: it turns out column access on recarrays is actually slower: # TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays? # it might make sense to make these structured arrays?
array = df.to_records() array = df.to_records(index=False)
_nan_to_closest_num(array) _nan_to_closest_num(array)
return array return array

View File

@ -38,7 +38,7 @@ class Axis(pg.AxisItem):
def __init__( def __init__(
self, self,
linked_charts, linked_charts,
typical_max_str: str = '100 000.00', typical_max_str: str = '100 000.000',
min_tick: int = 2, min_tick: int = 2,
**kwargs **kwargs
) -> None: ) -> None:
@ -51,6 +51,8 @@ class Axis(pg.AxisItem):
self.setStyle(**{ self.setStyle(**{
'textFillLimits': [(0, 0.666)], 'textFillLimits': [(0, 0.666)],
'tickFont': _font.font, 'tickFont': _font.font,
# offset of text *away from* axis line in px
'tickTextOffset': 2,
}) })
self.setTickFont(_font.font) self.setTickFont(_font.font)
@ -88,8 +90,7 @@ class PriceAxis(Axis):
# print(f'digits: {digits}') # print(f'digits: {digits}')
return [ return [
('{value:,.{digits}f}') ('{value:,.{digits}f}').format(
.format(
digits=digits, digits=digits,
value=v, value=v,
).replace(',', ' ') for v in vals ).replace(',', ' ') for v in vals
@ -104,23 +105,36 @@ class DynamicDateAxis(Axis):
60: '%H:%M', 60: '%H:%M',
30: '%H:%M:%S', 30: '%H:%M:%S',
5: '%H:%M:%S', 5: '%H:%M:%S',
1: '%H:%M:%S',
} }
def resize(self) -> None: def resize(self) -> None:
self.setHeight(self.typical_br.height() + 3) self.setHeight(self.typical_br.height() + 1)
def _indexes_to_timestrs( def _indexes_to_timestrs(
self, self,
indexes: List[int], indexes: List[int],
) -> List[str]: ) -> List[str]:
bars = self.linked_charts.chart._array # try:
chart = self.linked_charts.chart
bars = chart._ohlc
shm = self.linked_charts.chart._shm
first = shm._first.value
bars_len = len(bars) bars_len = len(bars)
times = bars['time'] times = bars['time']
epochs = times[list( epochs = times[list(
map(int, filter(lambda i: i < bars_len, indexes)) map(
int,
filter(
lambda i: i > 0 and i < bars_len,
(i-first for i in indexes)
)
)
)] )]
# TODO: **don't** have this hard coded shift to EST # TODO: **don't** have this hard coded shift to EST
dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour() dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour()
@ -228,6 +242,7 @@ class AxisLabel(pg.GraphicsObject):
class XAxisLabel(AxisLabel): class XAxisLabel(AxisLabel):
_w_margin = 4
text_flags = ( text_flags = (
QtCore.Qt.TextDontClip QtCore.Qt.TextDontClip
@ -255,18 +270,17 @@ class XAxisLabel(AxisLabel):
w = self.boundingRect().width() w = self.boundingRect().width()
self.setPos(QPointF( self.setPos(QPointF(
abs_pos.x() - w / 2 - offset, abs_pos.x() - w / 2 - offset,
0, 1,
)) ))
self.update() self.update()
class YAxisLabel(AxisLabel): class YAxisLabel(AxisLabel):
_h_margin = 3 _h_margin = 2
# _w_margin = 1
text_flags = ( text_flags = (
# QtCore.Qt.AlignLeft QtCore.Qt.AlignLeft
QtCore.Qt.AlignHCenter # QtCore.Qt.AlignHCenter
| QtCore.Qt.AlignVCenter | QtCore.Qt.AlignVCenter
| QtCore.Qt.TextDontClip | QtCore.Qt.TextDontClip
) )
@ -289,7 +303,7 @@ class YAxisLabel(AxisLabel):
br = self.boundingRect() br = self.boundingRect()
h = br.height() h = br.height()
self.setPos(QPointF( self.setPos(QPointF(
0, 1,
abs_pos.y() - h / 2 - offset abs_pos.y() - h / 2 - offset
)) ))
self.update() self.update()

View File

@ -17,7 +17,7 @@
""" """
High level Qt chart widgets. High level Qt chart widgets.
""" """
from typing import Tuple, Dict, Any, Optional from typing import Tuple, Dict, Any, Optional, Callable
from functools import partial from functools import partial
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
@ -105,6 +105,7 @@ class ChartSpace(QtGui.QWidget):
self.tf_layout.setContentsMargins(0, 12, 0, 0) self.tf_layout.setContentsMargins(0, 12, 0, 0)
time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN') time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
btn_prefix = 'TF' btn_prefix = 'TF'
for tf in time_frames: for tf in time_frames:
btn_name = ''.join([btn_prefix, tf]) btn_name = ''.join([btn_prefix, tf])
btn = QtGui.QPushButton(tf) btn = QtGui.QPushButton(tf)
@ -112,6 +113,7 @@ class ChartSpace(QtGui.QWidget):
btn.setEnabled(False) btn.setEnabled(False)
setattr(self, btn_name, btn) setattr(self, btn_name, btn)
self.tf_layout.addWidget(btn) self.tf_layout.addWidget(btn)
self.toolbar_layout.addLayout(self.tf_layout) self.toolbar_layout.addLayout(self.tf_layout)
# XXX: strat loader/saver that we don't need yet. # XXX: strat loader/saver that we don't need yet.
@ -123,8 +125,11 @@ class ChartSpace(QtGui.QWidget):
self, self,
symbol: str, symbol: str,
data: np.ndarray, data: np.ndarray,
ohlc: bool = True,
) -> None: ) -> None:
"""Load a new contract into the charting app. """Load a new contract into the charting app.
Expects a ``numpy`` structured array containing all the ohlcv fields.
""" """
# XXX: let's see if this causes mem problems # XXX: let's see if this causes mem problems
self.window.setWindowTitle(f'piker chart {symbol}') self.window.setWindowTitle(f'piker chart {symbol}')
@ -147,7 +152,8 @@ class ChartSpace(QtGui.QWidget):
if not self.v_layout.isEmpty(): if not self.v_layout.isEmpty():
self.v_layout.removeWidget(linkedcharts) self.v_layout.removeWidget(linkedcharts)
main_chart = linkedcharts.plot_main(s, data) main_chart = linkedcharts.plot_ohlc_main(s, data)
self.v_layout.addWidget(linkedcharts) self.v_layout.addWidget(linkedcharts)
return linkedcharts, main_chart return linkedcharts, main_chart
@ -175,7 +181,6 @@ class LinkedSplitCharts(QtGui.QWidget):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.signals_visible: bool = False self.signals_visible: bool = False
self._array: np.ndarray = None # main data source
self._ch: CrossHair = None # crosshair graphics self._ch: CrossHair = None # crosshair graphics
self.chart: ChartPlotWidget = None # main (ohlc) chart self.chart: ChartPlotWidget = None # main (ohlc) chart
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
@ -184,11 +189,6 @@ class LinkedSplitCharts(QtGui.QWidget):
orientation='bottom', orientation='bottom',
linked_charts=self linked_charts=self
) )
self.xaxis_ind = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
# if _xaxis_at == 'bottom': # if _xaxis_at == 'bottom':
# self.xaxis.setStyle(showValues=False) # self.xaxis.setStyle(showValues=False)
# self.xaxis.hide() # self.xaxis.hide()
@ -216,20 +216,18 @@ class LinkedSplitCharts(QtGui.QWidget):
sizes.extend([min_h_ind] * len(self.subplots)) sizes.extend([min_h_ind] * len(self.subplots))
self.splitter.setSizes(sizes) # , int(self.height()*0.2) self.splitter.setSizes(sizes) # , int(self.height()*0.2)
def plot_main( def plot_ohlc_main(
self, self,
symbol: Symbol, symbol: Symbol,
array: np.ndarray, array: np.ndarray,
ohlc: bool = True, style: str = 'bar',
) -> 'ChartPlotWidget': ) -> 'ChartPlotWidget':
"""Start up and show main (price) chart and all linked subcharts. """Start up and show main (price) chart and all linked subcharts.
The data input struct array must include OHLC fields.
""" """
self.digits = symbol.digits() self.digits = symbol.digits()
# TODO: this should eventually be a view onto shared mem or some
# higher level type / API
self._array = array
# add crosshairs # add crosshairs
self._ch = CrossHair( self._ch = CrossHair(
linkedsplitcharts=self, linkedsplitcharts=self,
@ -239,11 +237,13 @@ class LinkedSplitCharts(QtGui.QWidget):
name=symbol.key, name=symbol.key,
array=array, array=array,
xaxis=self.xaxis, xaxis=self.xaxis,
ohlc=True, style=style,
_is_main=True, _is_main=True,
) )
# add crosshair graphic # add crosshair graphic
self.chart.addItem(self._ch) self.chart.addItem(self._ch)
# axis placement
if _xaxis_at == 'bottom': if _xaxis_at == 'bottom':
self.chart.hideAxis('bottom') self.chart.hideAxis('bottom')
@ -257,7 +257,7 @@ class LinkedSplitCharts(QtGui.QWidget):
name: str, name: str,
array: np.ndarray, array: np.ndarray,
xaxis: DynamicDateAxis = None, xaxis: DynamicDateAxis = None,
ohlc: bool = False, style: str = 'line',
_is_main: bool = False, _is_main: bool = False,
**cpw_kwargs, **cpw_kwargs,
) -> 'ChartPlotWidget': ) -> 'ChartPlotWidget':
@ -267,15 +267,25 @@ class LinkedSplitCharts(QtGui.QWidget):
""" """
if self.chart is None and not _is_main: if self.chart is None and not _is_main:
raise RuntimeError( raise RuntimeError(
"A main plot must be created first with `.plot_main()`") "A main plot must be created first with `.plot_ohlc_main()`")
# source of our custom interactions # source of our custom interactions
cv = ChartView() cv = ChartView()
cv.linked_charts = self cv.linked_charts = self
# use "indicator axis" by default # use "indicator axis" by default
xaxis = self.xaxis_ind if xaxis is None else xaxis if xaxis is None:
xaxis = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
cpw = ChartPlotWidget( cpw = ChartPlotWidget(
# this name will be used to register the primary
# graphics curve managed by the subchart
name=name,
array=array, array=array,
parent=self.splitter, parent=self.splitter,
axisItems={ axisItems={
@ -286,9 +296,12 @@ class LinkedSplitCharts(QtGui.QWidget):
cursor=self._ch, cursor=self._ch,
**cpw_kwargs, **cpw_kwargs,
) )
# this name will be used to register the primary
# graphics curve managed by the subchart # give viewbox a reference to primary chart
cpw.name = name # allowing for kb controls and interactions
# (see our custom view in `._interactions.py`)
cv.chart = cpw
cpw.plotItem.vb.linked_charts = self cpw.plotItem.vb.linked_charts = self
cpw.setFrameStyle(QtGui.QFrame.StyledPanel) # | QtGui.QFrame.Plain) cpw.setFrameStyle(QtGui.QFrame.StyledPanel) # | QtGui.QFrame.Plain)
cpw.hideButtons() cpw.hideButtons()
@ -302,11 +315,15 @@ class LinkedSplitCharts(QtGui.QWidget):
self._ch.add_plot(cpw) self._ch.add_plot(cpw)
# draw curve graphics # draw curve graphics
if ohlc: if style == 'bar':
cpw.draw_ohlc(name, array) cpw.draw_ohlc(name, array)
else:
elif style == 'line':
cpw.draw_curve(name, array) cpw.draw_curve(name, array)
else:
raise ValueError(f"Chart style {style} is currently unsupported")
if not _is_main: if not _is_main:
# track by name # track by name
self.subplots[name] = cpw self.subplots[name] = cpw
@ -316,6 +333,8 @@ class LinkedSplitCharts(QtGui.QWidget):
# XXX: we need this right? # XXX: we need this right?
# self.splitter.addWidget(cpw) # self.splitter.addWidget(cpw)
else:
assert style == 'bar', 'main chart must be OHLC'
return cpw return cpw
@ -341,6 +360,7 @@ class ChartPlotWidget(pg.PlotWidget):
def __init__( def __init__(
self, self,
# the data view we generate graphics from # the data view we generate graphics from
name: str,
array: np.ndarray, array: np.ndarray,
static_yrange: Optional[Tuple[float, float]] = None, static_yrange: Optional[Tuple[float, float]] = None,
cursor: Optional[CrossHair] = None, cursor: Optional[CrossHair] = None,
@ -353,16 +373,26 @@ class ChartPlotWidget(pg.PlotWidget):
# parent=None, # parent=None,
# plotItem=None, # plotItem=None,
# antialias=True, # antialias=True,
useOpenGL=True,
**kwargs **kwargs
) )
self.name = name
# self.setViewportMargins(0, 0, 0, 0) # self.setViewportMargins(0, 0, 0, 0)
self._array = array # readonly view of data self._ohlc = array # readonly view of ohlc data
self.default_view()
self._arrays = {} # readonly view of overlays
self._graphics = {} # registry of underlying graphics self._graphics = {} # registry of underlying graphics
self._overlays = {} # registry of overlay curves self._overlays = set() # registry of overlay curve names
self._labels = {} # registry of underlying graphics self._labels = {} # registry of underlying graphics
self._ysticks = {} # registry of underlying graphics self._ysticks = {} # registry of underlying graphics
self._vb = self.plotItem.vb self._vb = self.plotItem.vb
self._static_yrange = static_yrange # for "known y-range style" self._static_yrange = static_yrange # for "known y-range style"
self._view_mode: str = 'follow' self._view_mode: str = 'follow'
self._cursor = cursor # placehold for mouse self._cursor = cursor # placehold for mouse
@ -373,6 +403,7 @@ class ChartPlotWidget(pg.PlotWidget):
# show background grid # show background grid
self.showGrid(x=True, y=True, alpha=0.5) self.showGrid(x=True, y=True, alpha=0.5)
# TODO: stick in config
# use cross-hair for cursor? # use cross-hair for cursor?
# self.setCursor(QtCore.Qt.CrossCursor) # self.setCursor(QtCore.Qt.CrossCursor)
@ -387,14 +418,25 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.sigResized.connect(self._set_yrange) self._vb.sigResized.connect(self._set_yrange)
def last_bar_in_view(self) -> bool: def last_bar_in_view(self) -> bool:
self._array[-1]['index'] self._ohlc[-1]['index']
def update_contents_labels(self, index: int) -> None:
if index >= 0 and index < len(self._array):
array = self._array
def update_contents_labels(
self,
index: int,
# array_name: str,
) -> None:
if index >= 0 and index < self._ohlc[-1]['index']:
for name, (label, update) in self._labels.items(): for name, (label, update) in self._labels.items():
if name is self.name:
array = self._ohlc
else:
array = self._arrays[name]
try:
update(index, array) update(index, array)
except IndexError:
log.exception(f"Failed to update label: {name}")
def _set_xlimits( def _set_xlimits(
self, self,
@ -418,8 +460,11 @@ class ChartPlotWidget(pg.PlotWidget):
"""Return a range tuple for the bars present in view. """Return a range tuple for the bars present in view.
""" """
l, r = self.view_range() l, r = self.view_range()
lbar = max(l, 0) a = self._ohlc
rbar = min(r, len(self._array)) lbar = max(l, a[0]['index'])
rbar = min(r, a[-1]['index'])
# lbar = max(l, 0)
# rbar = min(r, len(self._ohlc))
return l, lbar, rbar, r return l, lbar, rbar, r
def default_view( def default_view(
@ -429,7 +474,8 @@ class ChartPlotWidget(pg.PlotWidget):
"""Set the view box to the "default" startup view of the scene. """Set the view box to the "default" startup view of the scene.
""" """
xlast = self._array[index]['index'] xlast = self._ohlc[index]['index']
print(xlast)
begin = xlast - _bars_to_left_in_follow_mode begin = xlast - _bars_to_left_in_follow_mode
end = xlast + _bars_from_right_in_follow_mode end = xlast + _bars_from_right_in_follow_mode
@ -450,7 +496,7 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.setXRange( self._vb.setXRange(
min=l + 1, min=l + 1,
max=r + 1, max=r + 1,
# holy shit, wtf dude... why tf would this not be 0 by # TODO: holy shit, wtf dude... why tf would this not be 0 by
# default... speechless. # default... speechless.
padding=0, padding=0,
) )
@ -465,6 +511,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Draw OHLC datums to chart. """Draw OHLC datums to chart.
""" """
graphics = style(self.plotItem) graphics = style(self.plotItem)
# adds all bar/candle graphics objects for each data point in # adds all bar/candle graphics objects for each data point in
# the np array buffer to be drawn on next render cycle # the np array buffer to be drawn on next render cycle
self.addItem(graphics) self.addItem(graphics)
@ -474,9 +521,11 @@ class ChartPlotWidget(pg.PlotWidget):
self._graphics[name] = graphics self._graphics[name] = graphics
label = ContentsLabel(chart=self, anchor_at=('top', 'left')) self.add_contents_label(
self._labels[name] = (label, partial(label.update_from_ohlc, name)) name,
label.show() anchor_at=('top', 'left'),
update_func=ContentsLabel.update_from_ohlc,
)
self.update_contents_labels(len(data) - 1) self.update_contents_labels(len(data) - 1)
self._add_sticky(name) self._add_sticky(name)
@ -488,41 +537,50 @@ class ChartPlotWidget(pg.PlotWidget):
name: str, name: str,
data: np.ndarray, data: np.ndarray,
overlay: bool = False, overlay: bool = False,
color: str = 'default_light',
add_label: bool = True,
**pdi_kwargs, **pdi_kwargs,
) -> pg.PlotDataItem: ) -> pg.PlotDataItem:
# draw the indicator as a plain curve """Draw a "curve" (line plot graphics) for the provided data in
the input array ``data``.
"""
_pdi_defaults = { _pdi_defaults = {
'pen': pg.mkPen(hcolor('default_light')), 'pen': pg.mkPen(hcolor(color)),
} }
pdi_kwargs.update(_pdi_defaults) pdi_kwargs.update(_pdi_defaults)
curve = pg.PlotDataItem( curve = pg.PlotDataItem(
data[name], y=data[name],
x=data['index'],
# antialias=True, # antialias=True,
name=name, name=name,
# TODO: see how this handles with custom ohlcv bars graphics # TODO: see how this handles with custom ohlcv bars graphics
# and/or if we can implement something similar for OHLC graphics
clipToView=True, clipToView=True,
**pdi_kwargs, **pdi_kwargs,
) )
self.addItem(curve) self.addItem(curve)
# register overlay curve with name # register curve graphics and backing array for name
self._graphics[name] = curve self._graphics[name] = curve
self._arrays[name] = data
if overlay: if overlay:
anchor_at = ('bottom', 'right') anchor_at = ('bottom', 'left')
self._overlays[name] = curve self._overlays.add(name)
else: else:
anchor_at = ('top', 'right') anchor_at = ('top', 'left')
# TODO: something instead of stickies for overlays # TODO: something instead of stickies for overlays
# (we need something that avoids clutter on x-axis). # (we need something that avoids clutter on x-axis).
self._add_sticky(name, bg_color='default_light') self._add_sticky(name, bg_color='default_light')
label = ContentsLabel(chart=self, anchor_at=anchor_at) if add_label:
self._labels[name] = (label, partial(label.update_from_value, name)) self.add_contents_label(name, anchor_at=anchor_at)
label.show()
self.update_contents_labels(len(data) - 1) self.update_contents_labels(len(data) - 1)
if self._cursor: if self._cursor:
@ -530,6 +588,23 @@ class ChartPlotWidget(pg.PlotWidget):
return curve return curve
def add_contents_label(
self,
name: str,
anchor_at: Tuple[str, str] = ('top', 'left'),
update_func: Callable = ContentsLabel.update_from_value,
) -> ContentsLabel:
label = ContentsLabel(chart=self, anchor_at=anchor_at)
self._labels[name] = (
# calls class method on instance
label,
partial(update_func, label, name)
)
label.show()
return label
def _add_sticky( def _add_sticky(
self, self,
name: str, name: str,
@ -556,9 +631,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Update the named internal graphics from ``array``. """Update the named internal graphics from ``array``.
""" """
if name not in self._overlays: self._ohlc = array
self._array = array
graphics = self._graphics[name] graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs) graphics.update_from_array(array, **kwargs)
return graphics return graphics
@ -573,12 +646,18 @@ class ChartPlotWidget(pg.PlotWidget):
""" """
if name not in self._overlays: if name not in self._overlays:
self._array = array self._ohlc = array
else:
self._arrays[name] = array
curve = self._graphics[name] curve = self._graphics[name]
# TODO: we should instead implement a diff based # TODO: we should instead implement a diff based
# "only update with new items" on the pg.PlotDataItem # "only update with new items" on the pg.PlotCurveItem
curve.setData(array[name], **kwargs) # one place to dig around this might be the `QBackingStore`
# https://doc.qt.io/qt-5/qbackingstore.html
curve.setData(y=array[name], x=array['index'], **kwargs)
return curve return curve
def _set_yrange( def _set_yrange(
@ -612,8 +691,9 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: logic to check if end of bars in view # TODO: logic to check if end of bars in view
extra = view_len - _min_points_to_show extra = view_len - _min_points_to_show
begin = 0 - extra begin = self._ohlc[0]['index'] - extra
end = len(self._array) - 1 + extra # end = len(self._ohlc) - 1 + extra
end = self._ohlc[-1]['index'] - 1 + extra
# XXX: test code for only rendering lines for the bars in view. # XXX: test code for only rendering lines for the bars in view.
# This turns out to be very very poor perf when scaling out to # This turns out to be very very poor perf when scaling out to
@ -629,10 +709,15 @@ class ChartPlotWidget(pg.PlotWidget):
# f"view_len: {view_len}, bars_len: {bars_len}\n" # f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}" # f"begin: {begin}, end: {end}, extra: {extra}"
# ) # )
self._set_xlimits(begin, end) # self._set_xlimits(begin, end)
# TODO: this should be some kind of numpy view api # TODO: this should be some kind of numpy view api
bars = self._array[lbar:rbar] # bars = self._ohlc[lbar:rbar]
a = self._ohlc
ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst]
if not len(bars): if not len(bars):
# likely no data loaded yet or extreme scrolling? # likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}") log.error(f"WTF bars_range = {lbar}:{rbar}")
@ -644,39 +729,42 @@ class ChartPlotWidget(pg.PlotWidget):
ylow = np.nanmin(bars['low']) ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high']) yhigh = np.nanmax(bars['high'])
except (IndexError, ValueError): except (IndexError, ValueError):
# must be non-ohlc array? # likely non-ohlc array?
bars = bars[self.name]
ylow = np.nanmin(bars) ylow = np.nanmin(bars)
yhigh = np.nanmax(bars) yhigh = np.nanmax(bars)
# view margins: stay within a % of the "true range" # view margins: stay within a % of the "true range"
diff = yhigh - ylow diff = yhigh - ylow
ylow = ylow - (diff * 0.04) ylow = ylow - (diff * 0.04)
# yhigh = yhigh + (diff * 0.01) yhigh = yhigh + (diff * 0.04)
# compute contents label "height" in view terms # # compute contents label "height" in view terms
# to avoid having data "contents" overlap with them # # to avoid having data "contents" overlap with them
if self._labels: # if self._labels:
label = self._labels[self.name][0] # label = self._labels[self.name][0]
rect = label.itemRect() # rect = label.itemRect()
tl, br = rect.topLeft(), rect.bottomRight() # tl, br = rect.topLeft(), rect.bottomRight()
vb = self.plotItem.vb # vb = self.plotItem.vb
try: # try:
# on startup labels might not yet be rendered # # on startup labels might not yet be rendered
top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y()) # top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
# XXX: magic hack, how do we compute exactly? # # XXX: magic hack, how do we compute exactly?
label_h = (top - bottom) * 0.42 # label_h = (top - bottom) * 0.42
except np.linalg.LinAlgError: # except np.linalg.LinAlgError:
label_h = 0 # label_h = 0
else: # else:
label_h = 0 # label_h = 0
# print(f'label height {self.name}: {label_h}') # # print(f'label height {self.name}: {label_h}')
if label_h > yhigh - ylow: # if label_h > yhigh - ylow:
# label_h = 0
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
label_h = 0 label_h = 0
self.setLimits( self.setLimits(
@ -715,13 +803,6 @@ async def _async_main(
# chart_app.init_search() # chart_app.init_search()
# from ._exec import get_screen
# screen = get_screen(chart_app.geometry().bottomRight())
# XXX: bug zone if you try to ctl-c after this we get hangs again?
# wtf...
# await tractor.breakpoint()
# historical data fetch # historical data fetch
brokermod = brokers.get_brokermod(brokername) brokermod = brokers.get_brokermod(brokername)
@ -735,46 +816,62 @@ async def _async_main(
bars = ohlcv.array bars = ohlcv.array
# load in symbol's ohlc data # load in symbol's ohlc data
# await tractor.breakpoint()
linked_charts, chart = chart_app.load_symbol(sym, bars) linked_charts, chart = chart_app.load_symbol(sym, bars)
# plot historical vwap if available # plot historical vwap if available
vwap_in_history = False wap_in_history = False
if 'vwap' in bars.dtype.fields:
vwap_in_history = True if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve( chart.draw_curve(
name='vwap', name='bar_wap',
data=bars, data=bars,
overlay=True, add_label=False,
) )
chart._set_yrange() chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# eventually we'll support some kind of n-compose syntax
fsp_conf = {
# 'vwap': {
# 'overlay': True,
# 'anchor': 'session',
# },
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators") # load initial fsp chain (otherwise known as "indicators")
n.start_soon( n.start_soon(
chart_from_fsp, spawn_fsps,
linked_charts, linked_charts,
'rsi', # eventually will be n-compose syntax fsp_conf,
sym, sym,
ohlcv, ohlcv,
brokermod, brokermod,
loglevel, loglevel,
) )
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
# start graphics update loop(s)after receiving first live quote # start graphics update loop(s)after receiving first live quote
n.start_soon( n.start_soon(
chart_from_quotes, chart_from_quotes,
chart, chart,
feed.stream, feed.stream,
ohlcv, ohlcv,
vwap_in_history, wap_in_history,
) )
# wait for a first quote before we start any update tasks # wait for a first quote before we start any update tasks
@ -797,9 +894,10 @@ async def chart_from_quotes(
chart: ChartPlotWidget, chart: ChartPlotWidget,
stream, stream,
ohlcv: np.ndarray, ohlcv: np.ndarray,
vwap_in_history: bool = False, wap_in_history: bool = False,
) -> None: ) -> None:
"""The 'main' (price) chart real-time update loop. """The 'main' (price) chart real-time update loop.
""" """
# TODO: bunch of stuff: # TODO: bunch of stuff:
# - I'm starting to think all this logic should be # - I'm starting to think all this logic should be
@ -809,24 +907,40 @@ async def chart_from_quotes(
# - update last open price correctly instead # - update last open price correctly instead
# of copying it from last bar's close # of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does? # - 5 sec bar lookback-autocorrection like tws does?
# update last price sticky
last_price_sticky = chart._ysticks[chart.name] last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
def maxmin(): def maxmin():
# TODO: implement this # TODO: implement this
# https://arxiv.org/abs/cs/0610046 # https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin # https://github.com/lemire/pythonmaxmin
array = chart._array array = chart._ohlc
ifirst = array[0]['index']
last_bars_range = chart.bars_range() last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range l, lbar, rbar, r = last_bars_range
in_view = array[lbar:rbar] in_view = array[lbar - ifirst:rbar - ifirst]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
return last_bars_range, mx, mn return last_bars_range, mx, mn
last_bars_range, last_mx, last_mn = maxmin()
chart.default_view() chart.default_view()
last_bars_range, last_mx, last_mn = maxmin()
last, volume = ohlcv.array[-1][['close', 'volume']] last, volume = ohlcv.array[-1][['close', 'volume']]
l1 = L1Labels( l1 = L1Labels(
@ -836,9 +950,16 @@ async def chart_from_quotes(
size_digits=min(float_digits(volume), 3) size_digits=min(float_digits(volume), 3)
) )
# TODO:
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# - if trade volume jumps above / below prior L1 price
# levels this might be dark volume we need to
# present differently?
async for quotes in stream: async for quotes in stream:
for sym, quote in quotes.items(): for sym, quote in quotes.items():
# print(f'CHART: {quote}')
for tick in quote.get('ticks', ()): for tick in quote.get('ticks', ()):
@ -847,7 +968,14 @@ async def chart_from_quotes(
price = tick.get('price') price = tick.get('price')
size = tick.get('size') size = tick.get('size')
if ticktype in ('trade', 'utrade'): # compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
brange, mx_in_view, mn_in_view = maxmin()
l, lbar, rbar, r = brange
if ticktype in ('trade', 'utrade', 'last'):
array = ohlcv.array array = ohlcv.array
# update price sticky(s) # update price sticky(s)
@ -856,30 +984,16 @@ async def chart_from_quotes(
*last[['index', 'close']] *last[['index', 'close']]
) )
# plot bars
# update price bar # update price bar
chart.update_ohlc_from_array( chart.update_ohlc_from_array(
chart.name, chart.name,
array, array,
) )
if vwap_in_history: if wap_in_history:
# update vwap overlay line # update vwap overlay line
chart.update_curve_from_array('vwap', ohlcv.array) chart.update_curve_from_array('bar_wap', ohlcv.array)
# TODO:
# - eventually we'll want to update bid/ask labels
# and other data as subscribed by underlying UI
# consumers.
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# if trade volume jumps above / below prior L1 price
# levels adjust bid / ask lines to match
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
brange, mx_in_view, mn_in_view = maxmin()
# XXX: prettty sure this is correct? # XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'): # if ticktype in ('trade', 'last'):
@ -910,7 +1024,7 @@ async def chart_from_quotes(
l1.bid_label.update_from_data(0, price) l1.bid_label.update_from_data(0, price)
# update min price in view to keep bid on screen # update min price in view to keep bid on screen
mn_in_view = max(price, mn_in_view) mn_in_view = min(price, mn_in_view)
if mx_in_view > last_mx or mn_in_view < last_mn: if mx_in_view > last_mx or mn_in_view < last_mn:
chart._set_yrange(yrange=(mn_in_view, mx_in_view)) chart._set_yrange(yrange=(mn_in_view, mx_in_view))
@ -923,9 +1037,10 @@ async def chart_from_quotes(
last_bars_range = brange last_bars_range = brange
async def chart_from_fsp( async def spawn_fsps(
linked_charts, linked_charts: LinkedSplitCharts,
fsp_func_name, # fsp_func_name,
fsps: Dict[str, str],
sym, sym,
src_shm, src_shm,
brokermod, brokermod,
@ -934,8 +1049,21 @@ async def chart_from_fsp(
"""Start financial signal processing in subactor. """Start financial signal processing in subactor.
Pass target entrypoint and historical data. Pass target entrypoint and historical data.
""" """
name = f'fsp.{fsp_func_name}' # spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery() as n:
# spawns local task that consume and chart data streams from
# sub-procs
async with trio.open_nursery() as ln:
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for fsp_func_name, conf in fsps.items():
display_name = f'fsp.{fsp_func_name}'
# TODO: load function here and introspect # TODO: load function here and introspect
# return stream type(s) # return stream type(s)
@ -943,9 +1071,9 @@ async def chart_from_fsp(
# TODO: should `index` be a required internal field? # TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
async with tractor.open_nursery() as n: key = f'{sym}.' + display_name
key = f'{sym}.' + name
# this is all sync currently
shm, opened = maybe_open_shm_array( shm, opened = maybe_open_shm_array(
key, key,
# TODO: create entry for each time frame # TODO: create entry for each time frame
@ -957,19 +1085,30 @@ async def chart_from_fsp(
# now until we figure out how to wrap fsps as "feeds". # now until we figure out how to wrap fsps as "feeds".
assert opened, f"A chart for {key} likely already exists?" assert opened, f"A chart for {key} likely already exists?"
# start fsp sub-actor conf['shm'] = shm
# spawn closure, can probably define elsewhere
async def spawn_fsp_daemon(
fsp_name: str,
display_name: str,
conf: dict,
):
"""Start an fsp subactor async.
"""
print(f'FSP NAME: {fsp_name}')
portal = await n.run_in_actor( portal = await n.run_in_actor(
# name as title of sub-chart # name as title of sub-chart
name, display_name,
# subactor entrypoint # subactor entrypoint
fsp.cascade, fsp.cascade,
brokername=brokermod.name, brokername=brokermod.name,
src_shm_token=src_shm.token, src_shm_token=src_shm.token,
dst_shm_token=shm.token, dst_shm_token=conf['shm'].token,
symbol=sym, symbol=sym,
fsp_func_name=fsp_func_name, fsp_func_name=fsp_name,
# tractor config # tractor config
loglevel=loglevel, loglevel=loglevel,
@ -981,6 +1120,54 @@ async def chart_from_fsp(
# data-array as first msg # data-array as first msg
_ = await stream.receive() _ = await stream.receive()
conf['stream'] = stream
conf['portal'] = portal
# new local task
ln.start_soon(
spawn_fsp_daemon,
fsp_func_name,
display_name,
conf,
)
# blocks here until all daemons up
# start and block on update loops
async with trio.open_nursery() as ln:
for fsp_func_name, conf in fsps.items():
ln.start_soon(
update_signals,
linked_charts,
fsp_func_name,
conf,
)
async def update_signals(
linked_charts: LinkedSplitCharts,
fsp_func_name: str,
conf: Dict[str, Any],
) -> None:
"""FSP stream chart update loop.
This is called once for each entry in the fsp
config map.
"""
shm = conf['shm']
if conf.get('overlay'):
chart = linked_charts.chart
chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
)
last_val_sticky = None
else:
chart = linked_charts.add_plot( chart = linked_charts.add_plot(
name=fsp_func_name, name=fsp_func_name,
array=shm.array, array=shm.array,
@ -989,12 +1176,17 @@ async def chart_from_fsp(
ohlc=False, ohlc=False,
# settings passed down to ``ChartPlotWidget`` # settings passed down to ``ChartPlotWidget``
static_yrange=(0, 100), **conf.get('chart_kwargs', {})
# static_yrange=(0, 100),
) )
# display contents labels asap # display contents labels asap
chart.update_contents_labels(len(shm.array) - 1) chart.update_contents_labels(
len(shm.array) - 1,
# fsp_func_name
)
# read last value
array = shm.array array = shm.array
value = array[fsp_func_name][-1] value = array[fsp_func_name][-1]
@ -1002,7 +1194,8 @@ async def chart_from_fsp(
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array) chart.update_curve_from_array(fsp_func_name, array)
chart.default_view()
chart._shm = shm
# TODO: figure out if we can roll our own `FillToThreshold` to # TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions. # get brush filled polygons for OS/OB conditions.
@ -1015,20 +1208,28 @@ async def chart_from_fsp(
# graphics.curve.setFillLevel(50) # graphics.curve.setFillLevel(50)
# add moveable over-[sold/bought] lines # add moveable over-[sold/bought] lines
level_line(chart, 30) # and labels only for the 70/30 lines
level_line(chart, 70, orient_v='top') level_line(chart, 20, show_label=False)
level_line(chart, 30, orient_v='top')
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top', show_label=False)
chart._shm = shm
chart._set_yrange() chart._set_yrange()
stream = conf['stream']
# update chart graphics # update chart graphics
async for value in stream: async for value in stream:
# p = pg.debug.Profiler(disabled=False, delayed=False)
# read last
array = shm.array array = shm.array
value = array[-1][fsp_func_name] value = array[-1][fsp_func_name]
if last_val_sticky:
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
# update graphics
chart.update_curve_from_array(fsp_func_name, array) chart.update_curve_from_array(fsp_func_name, array)
# p('rendered rsi datum')
async def check_for_new_bars(feed, ohlcv, linked_charts): async def check_for_new_bars(feed, ohlcv, linked_charts):
@ -1079,18 +1280,24 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
# resize view # resize view
# price_chart._set_yrange() # price_chart._set_yrange()
for name, curve in price_chart._overlays.items(): for name in price_chart._overlays:
# TODO: standard api for signal lookups per plot price_chart.update_curve_from_array(
if name in price_chart._array.dtype.fields: name,
price_chart._arrays[name]
)
# should have already been incremented above # # TODO: standard api for signal lookups per plot
price_chart.update_curve_from_array(name, price_chart._array) # if name in price_chart._ohlc.dtype.fields:
# # should have already been incremented above
# price_chart.update_curve_from_array(name, price_chart._ohlc)
for name, chart in linked_charts.subplots.items(): for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array) chart.update_curve_from_array(chart.name, chart._shm.array)
# chart._set_yrange() # chart._set_yrange()
# shift the view if in follow mode
price_chart.increment_view() price_chart.increment_view()

View File

@ -20,12 +20,15 @@ Trio - Qt integration
Run ``trio`` in guest mode on top of the Qt event loop. Run ``trio`` in guest mode on top of the Qt event loop.
All global Qt runtime settings are mostly defined here. All global Qt runtime settings are mostly defined here.
""" """
import os
import signal
from functools import partial from functools import partial
import traceback import traceback
from typing import Tuple, Callable, Dict, Any from typing import Tuple, Callable, Dict, Any
# Qt specific # Qt specific
import PyQt5 # noqa import PyQt5 # noqa
import pyqtgraph as pg
from pyqtgraph import QtGui from pyqtgraph import QtGui
from PyQt5 import QtCore from PyQt5 import QtCore
from PyQt5.QtCore import ( from PyQt5.QtCore import (
@ -37,6 +40,12 @@ import tractor
from outcome import Error from outcome import Error
# pyqtgraph global config
# might as well enable this for now?
pg.useOpenGL = True
pg.enableExperimental = True
# singleton app per actor # singleton app per actor
_qt_app: QtGui.QApplication = None _qt_app: QtGui.QApplication = None
_qt_win: QtGui.QMainWindow = None _qt_win: QtGui.QMainWindow = None
@ -67,6 +76,17 @@ class MainWindow(QtGui.QMainWindow):
self.setMinimumSize(*self.size) self.setMinimumSize(*self.size)
self.setWindowTitle(self.title) self.setWindowTitle(self.title)
def closeEvent(
self,
event: 'QCloseEvent'
) -> None:
"""Cancel the root actor asap.
"""
# raising KBI seems to get intercepted by by Qt so just use the
# system.
os.kill(os.getpid(), signal.SIGINT)
def run_qtractor( def run_qtractor(
func: Callable, func: Callable,
@ -115,10 +135,14 @@ def run_qtractor(
def done_callback(outcome): def done_callback(outcome):
print(f"Outcome: {outcome}")
if isinstance(outcome, Error): if isinstance(outcome, Error):
exc = outcome.error exc = outcome.error
if isinstance(outcome.error, KeyboardInterrupt):
# make it kinda look like ``trio``
print("Terminated!")
else:
traceback.print_exception(type(exc), exc, exc.__traceback__) traceback.print_exception(type(exc), exc, exc.__traceback__)
app.quit() app.quit()
@ -154,6 +178,7 @@ def run_qtractor(
main, main,
run_sync_soon_threadsafe=run_sync_soon_threadsafe, run_sync_soon_threadsafe=run_sync_soon_threadsafe,
done_callback=done_callback, done_callback=done_callback,
# restrict_keyboard_interrupt_to_checkpoints=True,
) )
window.main_widget = main_widget window.main_widget = main_widget

View File

@ -17,39 +17,44 @@
""" """
Chart graphics for displaying a slew of different data types. Chart graphics for displaying a slew of different data types.
""" """
import inspect
# import time
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
# from numba import jit, float64, optional, int64 from numba import jit, float64, int64 # , optional
# from numba import types as ntypes
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import QLineF, QPointF from PyQt5.QtCore import QLineF, QPointF
# from .._profile import timeit from .._profile import timeit
# from ..data._source import numba_ohlc_dtype
from ._style import ( from ._style import (
_xaxis_at, _xaxis_at,
hcolor, hcolor,
_font, _font,
_down_2_font_inches_we_like,
) )
from ._axes import YAxisLabel, XAxisLabel, YSticky from ._axes import YAxisLabel, XAxisLabel, YSticky
# XXX: these settings seem to result in really decent mouse scroll # XXX: these settings seem to result in really decent mouse scroll
# latency (in terms of perceived lag in cross hair) so really be sure # latency (in terms of perceived lag in cross hair) so really be sure
# there's an improvement if you want to change it. # there's an improvement if you want to change it!
_mouse_rate_limit = 60 # calc current screen refresh rate? _mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 2e3 _debounce_delay = 1 / 2e3
_ch_label_opac = 1 _ch_label_opac = 1
# TODO: we need to handle the case where index is outside
# the underlying datums range
class LineDot(pg.CurvePoint): class LineDot(pg.CurvePoint):
def __init__( def __init__(
self, self,
curve: pg.PlotCurveItem, curve: pg.PlotCurveItem,
index: int, index: int,
plot: 'ChartPlotWidget',
pos=None, pos=None,
size: int = 2, # in pxs size: int = 2, # in pxs
color: str = 'default_light', color: str = 'default_light',
@ -61,6 +66,7 @@ class LineDot(pg.CurvePoint):
pos=pos, pos=pos,
rotate=False, rotate=False,
) )
self._plot = plot
# TODO: get pen from curve if not defined? # TODO: get pen from curve if not defined?
cdefault = hcolor(color) cdefault = hcolor(color)
@ -80,6 +86,31 @@ class LineDot(pg.CurvePoint):
# keep a static size # keep a static size
self.setFlag(self.ItemIgnoresTransformations) self.setFlag(self.ItemIgnoresTransformations)
def event(
self,
ev: QtCore.QEvent,
) -> None:
# print((ev, type(ev)))
if not isinstance(ev, QtCore.QDynamicPropertyChangeEvent) or self.curve() is None:
return False
# if ev.propertyName() == 'index':
# print(ev)
# # self.setProperty
(x, y) = self.curve().getData()
index = self.property('index')
# first = self._plot._ohlc[0]['index']
# first = x[0]
# i = index - first
i = index - x[0]
if i > 0 and i < len(y):
newPos = (index, y[i])
QtGui.QGraphicsItem.setPos(self, *newPos)
return True
return False
_corner_anchors = { _corner_anchors = {
'top': 0, 'top': 0,
@ -91,8 +122,9 @@ _corner_anchors = {
_corner_margins = { _corner_margins = {
('top', 'left'): (-4, -5), ('top', 'left'): (-4, -5),
('top', 'right'): (4, -5), ('top', 'right'): (4, -5),
('bottom', 'left'): (-4, 5),
('bottom', 'right'): (4, 5), ('bottom', 'left'): (-4, lambda font_size: font_size * 2),
('bottom', 'right'): (4, lambda font_size: font_size * 2),
} }
@ -109,7 +141,10 @@ class ContentsLabel(pg.LabelItem):
font_size: Optional[int] = None, font_size: Optional[int] = None,
) -> None: ) -> None:
font_size = font_size or _font.font.pixelSize() font_size = font_size or _font.font.pixelSize()
super().__init__(justify=justify_text, size=f'{str(font_size)}px') super().__init__(
justify=justify_text,
size=f'{str(font_size)}px'
)
# anchor to viewbox # anchor to viewbox
self.setParentItem(chart._vb) self.setParentItem(chart._vb)
@ -120,6 +155,10 @@ class ContentsLabel(pg.LabelItem):
index = (_corner_anchors[h], _corner_anchors[v]) index = (_corner_anchors[h], _corner_anchors[v])
margins = _corner_margins[(v, h)] margins = _corner_margins[(v, h)]
ydim = margins[1]
if inspect.isfunction(margins[1]):
margins = margins[0], ydim(font_size)
self.anchor(itemPos=index, parentPos=index, offset=margins) self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc( def update_from_ohlc(
@ -129,15 +168,19 @@ class ContentsLabel(pg.LabelItem):
array: np.ndarray, array: np.ndarray,
) -> None: ) -> None:
# this being "html" is the dumbest shit :eyeroll: # this being "html" is the dumbest shit :eyeroll:
first = array[0]['index']
self.setText( self.setText(
"<b>i</b>:{index}<br/>" "<b>i</b>:{index}<br/>"
"<b>O</b>:{}<br/>" "<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>" "<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>" "<b>L</b>:{}<br/>"
"<b>C</b>:{}<br/>" "<b>C</b>:{}<br/>"
"<b>V</b>:{}".format( "<b>V</b>:{}<br/>"
# *self._array[index].item()[2:8], "<b>wap</b>:{}".format(
*array[index].item()[2:8], *array[index - first][
['open', 'high', 'low', 'close', 'volume', 'bar_wap']
],
name=name, name=name,
index=index, index=index,
) )
@ -149,7 +192,9 @@ class ContentsLabel(pg.LabelItem):
index: int, index: int,
array: np.ndarray, array: np.ndarray,
) -> None: ) -> None:
data = array[index][name] first = array[0]['index']
if index < array[-1]['index'] and index > first:
data = array[index - first][name]
self.setText(f"{name}: {data:.2f}") self.setText(f"{name}: {data:.2f}")
@ -246,7 +291,7 @@ class CrossHair(pg.GraphicsObject):
) -> LineDot: ) -> LineDot:
# if this plot contains curves add line dot "cursors" to denote # if this plot contains curves add line dot "cursors" to denote
# the current sample under the mouse # the current sample under the mouse
cursor = LineDot(curve, index=len(plot._array)) cursor = LineDot(curve, index=plot._ohlc[-1]['index'], plot=plot)
plot.addItem(cursor) plot.addItem(cursor)
self.graphics[plot].setdefault('cursors', []).append(cursor) self.graphics[plot].setdefault('cursors', []).append(cursor)
return cursor return cursor
@ -308,8 +353,9 @@ class CrossHair(pg.GraphicsObject):
plot.update_contents_labels(ix) plot.update_contents_labels(ix)
# update all subscribed curve dots # update all subscribed curve dots
# first = plot._ohlc[0]['index']
for cursor in opts.get('cursors', ()): for cursor in opts.get('cursors', ()):
cursor.setIndex(ix) cursor.setIndex(ix) # - first)
# update the label on the bottom of the crosshair # update the label on the bottom of the crosshair
self.xaxis_label.update_label( self.xaxis_label.update_label(
@ -332,48 +378,23 @@ class CrossHair(pg.GraphicsObject):
return self.plots[0].boundingRect() return self.plots[0].boundingRect()
# @jit( def _mk_lines_array(
# # float64[:]( data: List,
# # float64[:], size: int,
# # optional(float64), elements_step: int = 6,
# # optional(int16) ) -> np.ndarray:
# # ), """Create an ndarray to hold lines graphics info.
# nopython=True,
# nogil=True
# )
def _mk_lines_array(data: List, size: int) -> np.ndarray:
"""Create an ndarray to hold lines graphics objects.
""" """
return np.zeros_like( return np.zeros_like(
data, data,
shape=(int(size), 3), shape=(int(size), elements_step),
dtype=object, dtype=object,
) )
# TODO: `numba` this? def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
open, high, low, close, index = row[
# @jit(
# # float64[:](
# # float64[:],
# # optional(float64),
# # optional(int16)
# # ),
# nopython=True,
# nogil=True
# )
def bars_from_ohlc(
data: np.ndarray,
w: float,
start: int = 0,
) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data.
"""
lines = _mk_lines_array(data, data.shape[0])
for i, q in enumerate(data[start:], start=start):
open, high, low, close, index = q[
['open', 'high', 'low', 'close', 'index']] ['open', 'high', 'low', 'close', 'index']]
# high -> low vertical (body) line # high -> low vertical (body) line
@ -393,35 +414,91 @@ def bars_from_ohlc(
# close line # close line
c = QLineF(index, close, index + w, close) c = QLineF(index, close, index + w, close)
# indexing here is as per the below comments return [hl, o, c]
lines[i] = (hl, o, c)
# XXX: in theory we could get a further speedup by using a flat
# array and avoiding the call to `np.ravel()` below?
# lines[3*i:3*i+3] = (hl, o, c)
# XXX: legacy code from candles custom graphics: @jit(
# if not _tina_mode: # TODO: for now need to construct this manually for readonly arrays, see
# else _tina_mode: # https://github.com/numba/numba/issues/4511
# self.lines = lines = np.concatenate( # ntypes.Tuple((float64[:], float64[:], float64[:]))(
# [high_to_low, open_sticks, close_sticks]) # numba_ohlc_dtype[::1], # contiguous
# use traditional up/down green/red coloring # int64,
# long_bars = np.resize(Quotes.close > Quotes.open, len(lines)) # optional(float64),
# short_bars = np.resize( # ),
# Quotes.close < Quotes.open, len(lines)) nopython=True,
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data.
# ups = lines[long_bars] """
# downs = lines[short_bars] size = int(data.shape[0] * 6)
# # draw "up" bars x = np.zeros(
# p.setPen(self.bull_brush) # data,
# p.drawLines(*ups) shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# # draw "down" bars # TODO: report bug for assert @
# p.setPen(self.bear_brush) # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
# p.drawLines(*downs) for i, q in enumerate(data[start:], start):
return lines # TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (0, 1, 1, 1, 1, 1)
return x, y, c
# @timeit
def gen_qpath(
data,
start, # XXX: do we need this?
w,
) -> QtGui.QPainterPath:
x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
# TODO: numba the internals of this!
return pg.functions.arrayToQPath(x, y, connect=c)
class BarItems(pg.GraphicsObject): class BarItems(pg.GraphicsObject):
@ -431,11 +508,10 @@ class BarItems(pg.GraphicsObject):
# 0.5 is no overlap between arms, 1.0 is full overlap # 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43 w: float = 0.43
bars_pen = pg.mkPen(hcolor('bracket'))
# XXX: tina mode, see below # XXX: for the mega-lulz increasing width here increases draw latency...
# bull_brush = pg.mkPen('#00cc00') # so probably don't do it until we figure that out.
# bear_brush = pg.mkPen('#fa0000') bars_pen = pg.mkPen(hcolor('bracket'))
def __init__( def __init__(
self, self,
@ -443,95 +519,87 @@ class BarItems(pg.GraphicsObject):
plotitem: 'pg.PlotItem', # noqa plotitem: 'pg.PlotItem', # noqa
) -> None: ) -> None:
super().__init__() super().__init__()
self.last = QtGui.QPicture()
self.history = QtGui.QPicture() self.last_bar = QtGui.QPicture()
# TODO: implement updateable pixmap solution
self.path = QtGui.QPainterPath()
# self._h_path = QtGui.QGraphicsPathItem(self.path)
self._pi = plotitem self._pi = plotitem
# self._scene = plotitem.vb.scene()
# self.picture = QtGui.QPixmap(1000, 300) self._xrange: Tuple[int, int]
# plotitem.addItem(self.picture) self._yrange: Tuple[float, float]
# self._pmi = None
# self._pmi = self._scene.addPixmap(self.picture)
# XXX: not sure this actually needs to be an array other # XXX: not sure this actually needs to be an array other
# then for the old tina mode calcs for up/down bars below? # then for the old tina mode calcs for up/down bars below?
# lines container # lines container
self.lines = _mk_lines_array([], 50e3) # self.lines = _mk_lines_array([], 50e3, 6)
# TODO: don't render the full backing array each time
# self._path_data = None
self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None
# track the current length of drawable lines within the larger array # track the current length of drawable lines within the larger array
self.index: int = 0 self.start_index: int = 0
self.stop_index: int = 0
# @timeit # @timeit
def draw_from_data( def draw_from_data(
self, self,
data: np.ndarray, data: np.ndarray,
start: int = 0, start: int = 0,
): ) -> QtGui.QPainterPath:
"""Draw OHLC datum graphics from a ``np.ndarray``. """Draw OHLC datum graphics from a ``np.ndarray``.
This routine is usually only called to draw the initial history. This routine is usually only called to draw the initial history.
""" """
lines = bars_from_ohlc(data, self.w, start=start) self.path = gen_qpath(data, start, self.w)
# save graphics for later reference and keep track # save graphics for later reference and keep track
# of current internal "last index" # of current internal "last index"
index = len(lines) # self.start_index = len(data)
self.lines[:index] = lines index = data['index']
self.index = index self._xrange = (index[0], index[-1])
self._yrange = (
np.nanmax(data['high']),
np.nanmin(data['low']),
)
# up to last to avoid double draw of last bar # up to last to avoid double draw of last bar
self.draw_lines(just_history=True, iend=self.index - 1) self._last_bar_lines = lines_from_ohlc(data[-1], self.w)
self.draw_lines(iend=self.index)
# @timeit # create pics
def draw_lines( # self.draw_history()
self, self.draw_last_bar()
istart=0,
iend=None,
just_history=False,
# TODO: could get even fancier and only update the single close line?
lines=None,
) -> None:
"""Draw the current line set using the painter.
"""
if just_history:
# draw bars for the "history" picture
iend = iend or self.index - 1
pic = self.history
else:
# draw the last bar
istart = self.index - 1
iend = iend or self.index
pic = self.last
# use 2d array of lines objects, see conlusion on speed: # trigger render
# https://stackoverflow.com/a/60089929
flat = np.ravel(self.lines[istart:iend])
# TODO: do this with numba for speed gain:
# https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach
to_draw = flat[np.where(flat != None)] # noqa
# pre-computing a QPicture object allows paint() to run much
# more quickly, rather than re-drawing the shapes every time.
p = QtGui.QPainter(pic)
p.setPen(self.bars_pen)
# TODO: is there any way to not have to pass all the lines every
# iteration? It seems they won't draw unless it's done this way..
p.drawLines(*to_draw)
p.end()
# XXX: if we ever try using `QPixmap` again...
# if self._pmi is None:
# self._pmi = self.scene().addPixmap(self.picture)
# else:
# self._pmi.setPixmap(self.picture)
# trigger re-render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update # https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update() self.update()
return self.path
# def update_ranges(
# self,
# xmn: int,
# xmx: int,
# ymn: float,
# ymx: float,
# ) -> None:
# ...
def draw_last_bar(self) -> None:
"""Currently this draws lines to a cached ``QPicture`` which
is supposed to speed things up on ``.paint()`` calls (which
is a call to ``QPainter.drawPicture()`` but I'm not so sure.
"""
p = QtGui.QPainter(self.last_bar)
p.setPen(self.bars_pen)
p.drawLines(*tuple(filter(bool, self._last_bar_lines)))
p.end()
# @timeit
def update_from_array( def update_from_array(
self, self,
array: np.ndarray, array: np.ndarray,
@ -545,32 +613,65 @@ class BarItems(pg.GraphicsObject):
graphics object, and then update/rerender, but here we're graphics object, and then update/rerender, but here we're
assuming the prior graphics havent changed (OHLC history rarely assuming the prior graphics havent changed (OHLC history rarely
does) so this "should" be simpler and faster. does) so this "should" be simpler and faster.
This routine should be made (transitively) as fast as possible.
""" """
index = self.index # index = self.start_index
length = len(array) istart, istop = self._xrange
extra = length - index
# start_bar_to_update = index - 100 index = array['index']
first_index, last_index = index[0], index[-1]
# length = len(array)
prepend_length = istart - first_index
append_length = last_index - istop
# TODO: allow mapping only a range of lines thus
# only drawing as many bars as exactly specified.
if prepend_length:
# new history was added and we need to render a new path
new_bars = array[:prepend_length]
prepend_path = gen_qpath(new_bars, 0, self.w)
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from
# array[prepend_length + 1] ???
# update path
old_path = self.path
self.path = prepend_path
self.path.addPath(old_path)
if append_length:
# generate new lines objects for updatable "current bar"
self._last_bar_lines = lines_from_ohlc(array[-1], self.w)
self.draw_last_bar()
if extra > 0:
# generate new graphics to match provided array # generate new graphics to match provided array
new = array[index:index + extra] # path appending logic:
lines = bars_from_ohlc(new, self.w) # we need to get the previous "current bar(s)" for the time step
bars_added = len(lines) # and convert it to a sub-path to append to the historical set
self.lines[index:index + bars_added] = lines # new_bars = array[istop - 1:istop + append_length - 1]
self.index += bars_added new_bars = array[-append_length - 1:-1]
append_path = gen_qpath(new_bars, 0, self.w)
self.path.moveTo(float(istop - self.w), float(new_bars[0]['open']))
self.path.addPath(append_path)
self._xrange = first_index, last_index
# start_bar_to_update = index - bars_added
self.draw_lines(just_history=True)
if just_history: if just_history:
self.update()
return return
# current bar update # last bar update
i, o, h, l, last, v = array[-1][ i, o, h, l, last, v = array[-1][
['index', 'open', 'high', 'low', 'close', 'volume'] ['index', 'open', 'high', 'low', 'close', 'volume']
] ]
assert i == self.index - 1 # assert i == self.start_index - 1
body, larm, rarm = self.lines[i] assert i == last_index
body, larm, rarm = self._last_bar_lines
# XXX: is there a faster way to modify this? # XXX: is there a faster way to modify this?
rarm.setLine(rarm.x1(), last, rarm.x2(), last) rarm.setLine(rarm.x1(), last, rarm.x2(), last)
@ -579,16 +680,26 @@ class BarItems(pg.GraphicsObject):
if l != h: # noqa if l != h: # noqa
if body is None: if body is None:
body = self.lines[index - 1][0] = QLineF(i, l, i, h) body = self._last_bar_lines[0] = QLineF(i, l, i, h)
else: else:
# update body # update body
body.setLine(i, l, i, h) body.setLine(i, l, i, h)
else:
# XXX: h == l -> remove any HL line to avoid render bug
if body is not None:
body = self.lines[index - 1][0] = None
self.draw_lines(just_history=False) # XXX: pretty sure this is causing an issue where the bar has
# a large upward move right before the next sample and the body
# is getting set to None since the next bar is flat but the shm
# array index update wasn't read by the time this code runs. Iow
# we're doing this removal of the body for a bar index that is
# now out of date / from some previous sample. It's weird
# though because i've seen it do this to bars i - 3 back?
# else:
# # XXX: h == l -> remove any HL line to avoid render bug
# if body is not None:
# body = self.lines[index - 1][0] = None
self.draw_last_bar()
self.update()
# @timeit # @timeit
def paint(self, p, opt, widget): def paint(self, p, opt, widget):
@ -606,33 +717,36 @@ class BarItems(pg.GraphicsObject):
# as is necesarry for what's in "view". Not sure if this will # as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars # lead to any perf gains other then when zoomed in to less bars
# in view. # in view.
p.drawPicture(0, 0, self.history) p.drawPicture(0, 0, self.last_bar)
p.drawPicture(0, 0, self.last)
# TODO: if we can ever make pixmaps work... p.setPen(self.bars_pen)
# p.drawPixmap(0, 0, self.picture) p.drawPath(self.path)
# self._pmi.setPixmap(self.picture)
# print(self.scene())
# profiler('bars redraw:')
# @timeit
def boundingRect(self): def boundingRect(self):
# TODO: can we do rect caching to make this faster?
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
# TODO: Can we do rect caching to make this faster
# like `pg.PlotCurveItem` does? In theory it's just
# computing max/min stuff again like we do in the udpate loop
# anyway. Not really sure it's necessary since profiling already
# shows this method is faf.
# boundingRect _must_ indicate the entire area that will be # boundingRect _must_ indicate the entire area that will be
# drawn on or else we will get artifacts and possibly crashing. # drawn on or else we will get artifacts and possibly crashing.
# (in this case, QPicture does all the work of computing the # (in this case, QPicture does all the work of computing the
# bounding rect for us). # bounding rect for us).
# compute aggregate bounding rectangle # compute aggregate bounding rectangle
lb = self.last.boundingRect() lb = self.last_bar.boundingRect()
hb = self.history.boundingRect() hb = self.path.boundingRect()
return QtCore.QRectF( return QtCore.QRectF(
# top left # top left
QtCore.QPointF(hb.topLeft()), QtCore.QPointF(hb.topLeft()),
# total size # total size
QtCore.QSizeF(lb.size() + hb.size()) QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size())
# QtCore.QSizeF(lb.size() + hb.size())
) )
@ -785,7 +899,7 @@ class L1Labels:
chart: 'ChartPlotWidget', # noqa chart: 'ChartPlotWidget', # noqa
digits: int = 2, digits: int = 2,
size_digits: int = 0, size_digits: int = 0,
font_size_inches: float = 4 / 53., font_size_inches: float = _down_2_font_inches_we_like,
) -> None: ) -> None:
self.chart = chart self.chart = chart
@ -839,7 +953,9 @@ def level_line(
digits: int = 1, digits: int = 1,
# size 4 font on 4k screen scaled down, so small-ish. # size 4 font on 4k screen scaled down, so small-ish.
font_size_inches: float = 4 / 53., font_size_inches: float = _down_2_font_inches_we_like,
show_label: bool = True,
**linelabelkwargs **linelabelkwargs
) -> LevelLine: ) -> LevelLine:
@ -859,6 +975,7 @@ def level_line(
**linelabelkwargs **linelabelkwargs
) )
label.update_from_data(0, level) label.update_from_data(0, level)
# TODO: can we somehow figure out a max value from the parent axis? # TODO: can we somehow figure out a max value from the parent axis?
label._size_br_from_str(label.label_str) label._size_br_from_str(label.label_str)
@ -874,4 +991,7 @@ def level_line(
chart.plotItem.addItem(line) chart.plotItem.addItem(line)
if not show_label:
label.hide()
return line return line

View File

@ -245,7 +245,7 @@ class ChartView(ViewBox):
log.debug("Max zoom bruh...") log.debug("Max zoom bruh...")
return return
if ev.delta() < 0 and vl >= len(self.linked_charts._array) + 666: if ev.delta() < 0 and vl >= len(self.linked_charts.chart._ohlc) + 666:
log.debug("Min zoom bruh...") log.debug("Min zoom bruh...")
return return
@ -268,9 +268,9 @@ class ChartView(ViewBox):
# ).map(furthest_right_coord) # ).map(furthest_right_coord)
# ) # )
# This seems like the most "intuitive option, a hybrdid of # This seems like the most "intuitive option, a hybrid of
# tws and tv styles # tws and tv styles
last_bar = pg.Point(rbar) last_bar = pg.Point(int(rbar))
self._resetTarget() self._resetTarget()
self.scaleBy(s, last_bar) self.scaleBy(s, last_bar)

View File

@ -18,6 +18,7 @@
Qt UI styling. Qt UI styling.
""" """
from typing import Optional from typing import Optional
import math
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5 import QtCore, QtGui from PyQt5 import QtCore, QtGui
@ -27,10 +28,9 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
# chart-wide font # chart-wide fonts specified in inches
# font size 6px / 53 dpi (3x scaled down on 4k hidpi) _default_font_inches_we_like = 6 / 96
_default_font_inches_we_like = 6 / 53 # px / (px / inch) = inch _down_2_font_inches_we_like = 5 / 96
_down_2_font_inches_we_like = 4 / 53
class DpiAwareFont: class DpiAwareFont:
@ -66,8 +66,12 @@ class DpiAwareFont:
listed in the script in ``snippets/qt_screen_info.py``. listed in the script in ``snippets/qt_screen_info.py``.
""" """
dpi = screen.physicalDotsPerInch() # take the max since scaling can make things ugly in some cases
font_size = round(self._iwl * dpi) pdpi = screen.physicalDotsPerInch()
ldpi = screen.logicalDotsPerInch()
dpi = max(pdpi, ldpi)
font_size = math.floor(self._iwl * dpi)
log.info( log.info(
f"\nscreen:{screen.name()} with DPI: {dpi}" f"\nscreen:{screen.name()} with DPI: {dpi}"
f"\nbest font size is {font_size}\n" f"\nbest font size is {font_size}\n"