diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py
index cf509bfb..9731e37f 100644
--- a/piker/brokers/ib.py
+++ b/piker/brokers/ib.py
@@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
-from contextlib import asynccontextmanager, contextmanager
+from contextlib import asynccontextmanager
from dataclasses import asdict
from functools import partial
+from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio
import logging
@@ -32,6 +33,7 @@ import itertools
import time
from async_generator import aclosing
+from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker
import ib_insync as ibis
@@ -45,7 +47,7 @@ from ..data import (
maybe_spawn_brokerd,
iterticks,
attach_shm_array,
- get_shm_token,
+ # get_shm_token,
subscribe_ohlc_for_increment,
)
from ..data._source import from_df
@@ -86,6 +88,8 @@ _time_frames = {
'Y': 'OneYear',
}
+_show_wap_in_history = False
+
# overrides to sidestep pretty questionable design decisions in
# ``ib_insync``:
@@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
+_enters = 0
+
class Client:
"""IB wrapped for our broker backend API.
@@ -142,32 +148,54 @@ class Client:
self.ib = ib
self.ib.RaiseRequestErrors = True
+ # NOTE: the ib.client here is "throttled" to 45 rps by default
+
async def bars(
self,
symbol: str,
# EST in ISO 8601 format is required... below is EPOCH
- start_date: str = "1970-01-01T00:00:00.000000-05:00",
- time_frame: str = '1m',
- count: int = int(2e3), # <- max allowed per query
- is_paid_feed: bool = False,
+ start_dt: str = "1970-01-01T00:00:00.000000-05:00",
+ end_dt: str = "",
+
+ sample_period_s: str = 1, # ohlc sample period
+ period_count: int = int(2e3), # <- max per 1s sample query
+
+ is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
"""
bars_kwargs = {'whatToShow': 'TRADES'}
+ global _enters
+ print(f'ENTER BARS {_enters}')
+ _enters += 1
+
contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync(
contract,
- endDateTime='',
- # durationStr='60 S',
- # durationStr='1 D',
+ endDateTime=end_dt,
+
+ # time history length values format:
+ # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
+
+ # OHLC sampling values:
+ # 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
+ # 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
+ # 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
+ # barSizeSetting='1 secs',
+
+ # durationStr='{count} S'.format(count=15000 * 5),
+ # durationStr='{count} D'.format(count=1),
+ # barSizeSetting='5 secs',
+
+ durationStr='{count} S'.format(count=period_count),
+ barSizeSetting='1 secs',
+
+ # barSizeSetting='1 min',
- # time length calcs
- durationStr='{count} S'.format(count=5000 * 5),
- barSizeSetting='5 secs',
# always use extended hours
useRTH=False,
@@ -181,9 +209,13 @@ class Client:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?")
+ # TODO: rewrite this faster with ``numba``
# convert to pandas dataframe:
df = ibis.util.df(bars)
- return from_df(df)
+ return bars, from_df(df)
+
+ def onError(self, reqId, errorCode, errorString, contract) -> None:
+ breakpoint()
async def search_stocks(
self,
@@ -237,6 +269,8 @@ class Client:
"""Get an unqualifed contract for the current "continous" future.
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)
+
+ # it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId)
@@ -279,10 +313,10 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD'
- if exch in ('PURE',):
+ if exch in ('PURE', 'TSE'):
# stupid ib...
+ primaryExchange = exch
exch = 'SMART'
- primaryExchange = 'PURE'
con = ibis.Stock(
symbol=sym,
@@ -293,10 +327,27 @@ class Client:
try:
exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0]
+
+ head = await self.get_head_time(contract)
+ print(head)
except IndexError:
raise ValueError(f"No contract could be found {con}")
return contract
+ async def get_head_time(
+ self,
+ contract: Contract,
+ ) -> datetime:
+ """Return the first datetime stamp for ``contract``.
+
+ """
+ return await self.ib.reqHeadTimeStampAsync(
+ contract,
+ whatToShow='TRADES',
+ useRTH=False,
+ formatDate=2, # timezone aware UTC datetime
+ )
+
async def stream_ticker(
self,
symbol: str,
@@ -309,7 +360,13 @@ class Client:
contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
+ # define a simple queue push routine that streams quote packets
+ # to trio over the ``to_trio`` memory channel.
+
def push(t):
+ """Push quotes to trio task.
+
+ """
# log.debug(t)
try:
to_trio.send_nowait(t)
@@ -346,9 +403,17 @@ async def _aio_get_client(
"""
# first check cache for existing client
+ # breakpoint()
try:
- yield _client_cache[(host, port)]
- except KeyError:
+ if port:
+ client = _client_cache[(host, port)]
+ else:
+ # grab first cached client
+ client = list(_client_cache.values())[0]
+
+ yield client
+
+ except (KeyError, IndexError):
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.
@@ -359,9 +424,11 @@ async def _aio_get_client(
ib = NonShittyIB()
ports = _try_ports if port is None else [port]
+
_err = None
for port in ports:
try:
+ log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id)
break
except ConnectionRefusedError as ce:
@@ -373,6 +440,7 @@ async def _aio_get_client(
try:
client = Client(ib)
_client_cache[(host, port)] = client
+ log.debug(f"Caching client for {(host, port)}")
yield client
except BaseException:
ib.disconnect()
@@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None,
**kwargs,
) -> None:
- log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client:
async_meth = getattr(client, meth)
@@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str,
**kwargs,
) -> None:
+ """Asyncio entry point to run tasks against the ``ib_insync`` api.
+
+ """
ca = tractor.current_actor()
assert ca.is_infected_aio()
@@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {}
-@contextmanager
-def activate_writer(key: str):
+@asynccontextmanager
+async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
+
if not writer_already_exists:
_local_buffer_writers[key] = True
- yield writer_already_exists
+ async with trio.open_nursery() as n:
+ yield writer_already_exists, n
+ else:
+ yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
+async def fill_bars(
+ first_bars,
+ shm,
+ count: int = 21,
+) -> None:
+ """Fill historical bars into shared mem / storage afap.
+
+ TODO: avoid pacing constraints:
+ https://github.com/pikers/piker/issues/128
+
+ """
+ next_dt = first_bars[0].date
+
+ i = 0
+ while i < count:
+
+ try:
+ bars, bars_array = await _trio_run_client_method(
+ method='bars',
+ symbol='.'.join(
+ (first_bars.contract.symbol, first_bars.contract.exchange)
+ ),
+ end_dt=next_dt,
+
+ )
+ shm.push(bars_array, prepend=True)
+ i += 1
+ next_dt = bars[0].date
+
+ except RequestError as err:
+ # TODO: retreive underlying ``ib_insync`` error~~
+ if err.code == 162:
+ log.exception(
+ "Data query rate reached: Press `ctrl-alt-f` in TWS")
+
+ await tractor.breakpoint()
+
+
# TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api.
# @tractor.msg.pub
@@ -575,7 +687,9 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing
- with activate_writer(shm_token['shm_name']) as writer_already_exists:
+ async with activate_writer(
+ shm_token['shm_name']
+ ) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
@@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer
readonly=False,
)
- bars = await _trio_run_client_method(
+
+ # async def retrieve_and_push():
+ start = time.time()
+
+ bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
+
)
- if bars is None:
+ log.info(f"bars_array request: {time.time() - start}")
+
+ if bars_array is None:
raise SymbolNotFound(sym)
# write historical data to buffer
- shm.push(bars)
+ shm.push(bars_array)
shm_token = shm.token
+ # TODO: generalize this for other brokers
+ # start bar filler task in bg
+ ln.start_soon(fill_bars, bars, shm)
+
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
@@ -656,6 +781,7 @@ async def stream_quotes(
# real-time stream
async for ticker in stream:
+ # print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
@@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
+ # print(f"{quote['symbol']}: {tick}")
+
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
@@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price
o = last
- shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = (
+ shm.array[[
+ 'open',
+ 'high',
+ 'low',
+ 'close',
+ 'volume',
+ ]][-1] = (
o,
max(high, last),
min(low, last),
diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py
index 4329a981..2289f743 100644
--- a/piker/brokers/kraken.py
+++ b/piker/brokers/kraken.py
@@ -57,13 +57,15 @@ _ohlc_dtype = [
('close', float),
('volume', float),
('count', int),
- ('vwap', float),
+ ('bar_wap', float),
]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
+_show_wap_in_history = True
+
class Client:
@@ -341,7 +343,7 @@ async def stream_quotes(
while True:
try:
async with trio_websocket.open_websocket_url(
- 'wss://ws.kraken.com',
+ 'wss://ws.kraken.com/',
) as ws:
# XXX: setup subs
@@ -433,7 +435,7 @@ async def stream_quotes(
'high',
'low',
'close',
- 'vwap',
+ 'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,
diff --git a/piker/data/__init__.py b/piker/data/__init__.py
index cae1347c..fa26801c 100644
--- a/piker/data/__init__.py
+++ b/piker/data/__init__.py
@@ -42,7 +42,7 @@ from ._sharedmem import (
ShmArray,
get_shm_token,
)
-from ._source import base_ohlc_dtype
+from ._source import base_iohlc_dtype
from ._buffer import (
increment_ohlc_buffer,
subscribe_ohlc_for_increment
@@ -139,6 +139,7 @@ class Feed:
name: str
stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray
+ # ticks: ShmArray
_broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@@ -188,7 +189,7 @@ async def open_feed(
key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype:
- dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype),
+ dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=True,
diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py
index 64460476..fed6b965 100644
--- a/piker/data/_buffer.py
+++ b/piker/data/_buffer.py
@@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
# append new entry to buffer thus "incrementing" the bar
array = shm.array
- last = array[-1:].copy()
- (index, t, close) = last[0][['index', 'time', 'close']]
+ last = array[-1:][shm._write_fields].copy()
+ # (index, t, close) = last[0][['index', 'time', 'close']]
+ (t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum
last[
- ['index', 'time', 'volume', 'open', 'high', 'low', 'close']
- ][0] = (index + 1, t + delay_s, 0, close, close, close, close)
+ ['time', 'volume', 'open', 'high', 'low', 'close']
+ ][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer
shm.push(last)
# broadcast the buffer index step
- yield {'index': shm._i.value}
+ yield {'index': shm._last.value}
def subscribe_ohlc_for_increment(
diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py
index cbda6062..363f3c01 100644
--- a/piker/data/_normalize.py
+++ b/piker/data/_normalize.py
@@ -33,6 +33,6 @@ def iterticks(
ticks = quote.get('ticks', ())
if ticks:
for tick in ticks:
- print(f"{quote['symbol']}: {tick}")
+ # print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types:
yield tick
diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py
index 7f90d1ae..fb442d7c 100644
--- a/piker/data/_sharedmem.py
+++ b/piker/data/_sharedmem.py
@@ -17,11 +17,10 @@
"""
NumPy compatible shared memory buffers for real-time FSP.
"""
-from typing import List
from dataclasses import dataclass, asdict
from sys import byteorder
-from typing import Tuple, Optional
-from multiprocessing import shared_memory
+from typing import List, Tuple, Optional
+from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink
@@ -29,7 +28,7 @@ import tractor
import numpy as np
from ..log import get_logger
-from ._source import base_ohlc_dtype
+from ._source import base_ohlc_dtype, base_iohlc_dtype
log = get_logger(__name__)
@@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd
class SharedInt:
+ """Wrapper around a single entry shared memory array which
+ holds an ``int`` value used as an index counter.
+
+ """
def __init__(
self,
- token: str,
- create: bool = False,
+ shm: SharedMemory,
) -> None:
- # create a single entry array for storing an index counter
- self._shm = shared_memory.SharedMemory(
- name=token,
- create=create,
- size=4, # std int
- )
+ self._shm = shm
@property
def value(self) -> int:
@@ -79,7 +76,7 @@ class SharedInt:
self._shm.buf[:] = value.to_bytes(4, byteorder)
def destroy(self) -> None:
- if shared_memory._USE_POSIX:
+ if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
@@ -91,7 +88,8 @@ class _Token:
which can be used to key a system wide post shm entry.
"""
shm_name: str # this servers as a "key" value
- shm_counter_name: str
+ shm_first_index_name: str
+ shm_last_index_name: str
dtype_descr: List[Tuple[str]]
def __post_init__(self):
@@ -130,27 +128,47 @@ def _make_token(
"""Create a serializable token that can be used
to access a shared array.
"""
- dtype = base_ohlc_dtype if dtype is None else dtype
+ dtype = base_iohlc_dtype if dtype is None else dtype
return _Token(
key,
- key + "_counter",
+ key + "_first",
+ key + "_last",
np.dtype(dtype).descr
)
class ShmArray:
+ """A shared memory ``numpy`` (compatible) array API.
+
+ An underlying shared memory buffer is allocated based on
+ a user specified ``numpy.ndarray``. This fixed size array
+ can be read and written to by pushing data both onto the "front"
+ or "back" of a set index range. The indexes for the "first" and
+ "last" index are themselves stored in shared memory (accessed via
+ ``SharedInt`` interfaces) values such that multiple processes can
+ interact with the same array using a synchronized-index.
+
+ """
def __init__(
self,
shmarr: np.ndarray,
- counter: SharedInt,
- shm: shared_memory.SharedMemory,
- readonly: bool = True,
+ first: SharedInt,
+ last: SharedInt,
+ shm: SharedMemory,
+ # readonly: bool = True,
) -> None:
self._array = shmarr
- self._i = counter
+
+ # indexes for first and last indices corresponding
+ # to fille data
+ self._first = first
+ self._last = last
+
self._len = len(shmarr)
self._shm = shm
- self._readonly = readonly
+
+ # pushing data does not write the index (aka primary key)
+ self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api?
@@ -158,24 +176,25 @@ class ShmArray:
def _token(self) -> _Token:
return _Token(
self._shm.name,
- self._i._shm.name,
+ self._first._shm.name,
+ self._last._shm.name,
self._array.dtype.descr,
)
@property
def token(self) -> dict:
- """Shared memory token that can be serialized
- and used by another process to attach to this array.
+ """Shared memory token that can be serialized and used by
+ another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
- return self._i.value % self._len
+ return self._last.value % self._len
@property
def array(self) -> np.ndarray:
- return self._array[:self._i.value]
+ return self._array[self._first.value:self._last.value]
def last(
self,
@@ -186,38 +205,90 @@ class ShmArray:
def push(
self,
data: np.ndarray,
+ prepend: bool = False,
) -> int:
"""Ring buffer like "push" to append data
- into the buffer and return updated index.
+ into the buffer and return updated "last" index.
"""
length = len(data)
- # TODO: use .index for actual ring logic?
- index = self._i.value
+
+ if prepend:
+ index = self._first.value - length
+ else:
+ index = self._last.value
+
end = index + length
- self._array[index:end] = data[:]
- self._i.value = end
- return end
+
+ fields = self._write_fields
+
+ try:
+ self._array[fields][index:end] = data[fields][:]
+ if prepend:
+ self._first.value = index
+ else:
+ self._last.value = end
+ return end
+ except ValueError as err:
+ # shoudl raise if diff detected
+ self.diff_err_fields(data)
+
+ raise err
+
+ def diff_err_fields(
+ self,
+ data: np.ndarray,
+ ) -> None:
+ # reraise with any field discrepancy
+ our_fields, their_fields = (
+ set(self._array.dtype.fields),
+ set(data.dtype.fields),
+ )
+
+ only_in_ours = our_fields - their_fields
+ only_in_theirs = their_fields - our_fields
+
+ if only_in_ours:
+ raise TypeError(
+ f"Input array is missing field(s): {only_in_ours}"
+ )
+ elif only_in_theirs:
+ raise TypeError(
+ f"Input array has unknown field(s): {only_in_theirs}"
+ )
+
+ def prepend(
+ self,
+ data: np.ndarray,
+ ) -> int:
+ end = self.push(data, prepend=True)
+ assert end
def close(self) -> None:
- self._i._shm.close()
+ self._first._shm.close()
+ self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
- if shared_memory._USE_POSIX:
+ if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
- self._i.destroy()
+
+ self._first.destroy()
+ self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
+# how much is probably dependent on lifestyle
+_secs_in_day = int(60 * 60 * 12)
+_default_size = 2 * _secs_in_day
+
def open_shm_array(
key: Optional[str] = None,
- # approx number of 5s bars in a "day" x2
- size: int = int(2*60*60*10/5),
+ size: int = _default_size,
dtype: Optional[np.dtype] = None,
readonly: bool = False,
) -> ShmArray:
@@ -229,7 +300,9 @@ def open_shm_array(
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
- shm = shared_memory.SharedMemory(
+ a['index'] = np.arange(len(a))
+
+ shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
@@ -243,17 +316,30 @@ def open_shm_array(
dtype=dtype
)
- counter = SharedInt(
- token=token.shm_counter_name,
- create=True,
+ # create single entry arrays for storing an first and last indices
+ first = SharedInt(
+ shm=SharedMemory(
+ name=token.shm_first_index_name,
+ create=True,
+ size=4, # std int
+ )
)
- counter.value = 0
+
+ last = SharedInt(
+ shm=SharedMemory(
+ name=token.shm_last_index_name,
+ create=True,
+ size=4, # std int
+ )
+ )
+
+ last.value = first.value = int(_secs_in_day)
shmarr = ShmArray(
array,
- counter,
+ first,
+ last,
shm,
- readonly=readonly,
)
assert shmarr._token == token
@@ -261,26 +347,31 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
- actor = tractor.current_actor()
- actor._lifetime_stack.callback(shmarr.close)
- actor._lifetime_stack.callback(shmarr.destroy)
+ tractor._actor._lifetime_stack.callback(shmarr.close)
+ tractor._actor._lifetime_stack.callback(shmarr.destroy)
+
return shmarr
def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]],
- size: int = int(60*60*10/5),
+ size: int = _default_size,
readonly: bool = True,
) -> ShmArray:
- """Load and attach to an existing shared memory array previously
+ """Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
+
+ No new shared mem is allocated but wrapper types for read/write
+ access are constructed.
"""
token = _Token.from_msg(token)
key = token.shm_name
+
if key in _known_tokens:
assert _known_tokens[key] == token, "WTF"
- shm = shared_memory.SharedMemory(name=key)
+ # attach to array buffer and view as per dtype
+ shm = SharedMemory(name=key)
shmarr = np.ndarray(
(size,),
dtype=token.dtype_descr,
@@ -288,15 +379,29 @@ def attach_shm_array(
)
shmarr.setflags(write=int(not readonly))
- counter = SharedInt(token=token.shm_counter_name)
+ first = SharedInt(
+ shm=SharedMemory(
+ name=token.shm_first_index_name,
+ create=False,
+ size=4, # std int
+ ),
+ )
+ last = SharedInt(
+ shm=SharedMemory(
+ name=token.shm_last_index_name,
+ create=False,
+ size=4, # std int
+ ),
+ )
+
# make sure we can read
- counter.value
+ first.value
sha = ShmArray(
shmarr,
- counter,
+ first,
+ last,
shm,
- readonly=readonly,
)
# read test
sha.array
@@ -308,8 +413,8 @@ def attach_shm_array(
_known_tokens[key] = token
# "close" attached shm on process teardown
- actor = tractor.current_actor()
- actor._lifetime_stack.callback(sha.close)
+ tractor._actor._lifetime_stack.callback(sha.close)
+
return sha
@@ -318,21 +423,20 @@ def maybe_open_shm_array(
dtype: Optional[np.dtype] = None,
**kwargs,
) -> Tuple[ShmArray, bool]:
- """Attempt to attach to a shared memory block by a
- "key" determined by the users overall "system"
+ """Attempt to attach to a shared memory block using a "key" lookup
+ to registered blocks in the users overall "system" registryt
(presumes you don't have the block's explicit token).
- This function is meant to solve the problem of
- discovering whether a shared array token has been
- allocated or discovered by the actor running in
- **this** process. Systems where multiple actors
- may seek to access a common block can use this
- function to attempt to acquire a token as discovered
- by the actors who have previously stored a
- "key" -> ``_Token`` map in an actor local variable.
+ This function is meant to solve the problem of discovering whether
+ a shared array token has been allocated or discovered by the actor
+ running in **this** process. Systems where multiple actors may seek
+ to access a common block can use this function to attempt to acquire
+ a token as discovered by the actors who have previously stored
+ a "key" -> ``_Token`` map in an actor local (aka python global)
+ variable.
- If you know the explicit ``_Token`` for your memory
- instead use ``attach_shm_array``.
+ If you know the explicit ``_Token`` for your memory segment instead
+ use ``attach_shm_array``.
"""
try:
# see if we already know this key
diff --git a/piker/data/_source.py b/piker/data/_source.py
index 3ad6d3e8..26180443 100644
--- a/piker/data/_source.py
+++ b/piker/data/_source.py
@@ -15,27 +15,36 @@
# along with this program. If not, see .
"""
-Numpy data source machinery.
+numpy data source coversion helpers.
"""
import decimal
from dataclasses import dataclass
import numpy as np
import pandas as pd
+# from numba import from_dtype
+ohlc_fields = [
+ ('time', float),
+ ('open', float),
+ ('high', float),
+ ('low', float),
+ ('close', float),
+ ('volume', int),
+ ('bar_wap', float),
+]
+
+ohlc_with_index = ohlc_fields.copy()
+ohlc_with_index.insert(0, ('index', int))
+
# our minimum structured array layout for ohlc data
-base_ohlc_dtype = np.dtype(
- [
- ('index', int),
- ('time', float),
- ('open', float),
- ('high', float),
- ('low', float),
- ('close', float),
- ('volume', int),
- ]
-)
+base_iohlc_dtype = np.dtype(ohlc_with_index)
+base_ohlc_dtype = np.dtype(ohlc_fields)
+
+# TODO: for now need to construct this manually for readonly arrays, see
+# https://github.com/numba/numba/issues/4511
+# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values
tf_in_1m = {
@@ -110,18 +119,27 @@ def from_df(
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
+
+ # most feeds are providing this over sesssion anchored
+ 'vwap': 'bar_wap',
+
+ # XXX: ib_insync calls this the "wap of the bar"
+ # but no clue what is actually is...
+ # https://github.com/pikers/piker/issues/119#issuecomment-729120988
+ 'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
- if name not in base_ohlc_dtype.names[1:]:
+ # if name not in base_ohlc_dtype.names[1:]:
+ if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
- array = df.to_records()
+ array = df.to_records(index=False)
_nan_to_closest_num(array)
return array
diff --git a/piker/ui/_axes.py b/piker/ui/_axes.py
index e0be7178..d2bac3cd 100644
--- a/piker/ui/_axes.py
+++ b/piker/ui/_axes.py
@@ -38,7 +38,7 @@ class Axis(pg.AxisItem):
def __init__(
self,
linked_charts,
- typical_max_str: str = '100 000.00',
+ typical_max_str: str = '100 000.000',
min_tick: int = 2,
**kwargs
) -> None:
@@ -51,6 +51,8 @@ class Axis(pg.AxisItem):
self.setStyle(**{
'textFillLimits': [(0, 0.666)],
'tickFont': _font.font,
+ # offset of text *away from* axis line in px
+ 'tickTextOffset': 2,
})
self.setTickFont(_font.font)
@@ -88,11 +90,10 @@ class PriceAxis(Axis):
# print(f'digits: {digits}')
return [
- ('{value:,.{digits}f}')
- .format(
- digits=digits,
- value=v,
- ).replace(',', ' ') for v in vals
+ ('{value:,.{digits}f}').format(
+ digits=digits,
+ value=v,
+ ).replace(',', ' ') for v in vals
]
@@ -104,23 +105,36 @@ class DynamicDateAxis(Axis):
60: '%H:%M',
30: '%H:%M:%S',
5: '%H:%M:%S',
+ 1: '%H:%M:%S',
}
def resize(self) -> None:
- self.setHeight(self.typical_br.height() + 3)
+ self.setHeight(self.typical_br.height() + 1)
def _indexes_to_timestrs(
self,
indexes: List[int],
) -> List[str]:
- bars = self.linked_charts.chart._array
+ # try:
+ chart = self.linked_charts.chart
+ bars = chart._ohlc
+ shm = self.linked_charts.chart._shm
+ first = shm._first.value
+
bars_len = len(bars)
times = bars['time']
epochs = times[list(
- map(int, filter(lambda i: i < bars_len, indexes))
+ map(
+ int,
+ filter(
+ lambda i: i > 0 and i < bars_len,
+ (i-first for i in indexes)
+ )
+ )
)]
+
# TODO: **don't** have this hard coded shift to EST
dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour()
@@ -228,6 +242,7 @@ class AxisLabel(pg.GraphicsObject):
class XAxisLabel(AxisLabel):
+ _w_margin = 4
text_flags = (
QtCore.Qt.TextDontClip
@@ -255,18 +270,17 @@ class XAxisLabel(AxisLabel):
w = self.boundingRect().width()
self.setPos(QPointF(
abs_pos.x() - w / 2 - offset,
- 0,
+ 1,
))
self.update()
class YAxisLabel(AxisLabel):
- _h_margin = 3
- # _w_margin = 1
+ _h_margin = 2
text_flags = (
- # QtCore.Qt.AlignLeft
- QtCore.Qt.AlignHCenter
+ QtCore.Qt.AlignLeft
+ # QtCore.Qt.AlignHCenter
| QtCore.Qt.AlignVCenter
| QtCore.Qt.TextDontClip
)
@@ -283,13 +297,13 @@ class YAxisLabel(AxisLabel):
) -> None:
# this is read inside ``.paint()``
- self.label_str = '{value:,.{digits}f}'.format(
+ self.label_str = ' {value:,.{digits}f}'.format(
digits=self.digits, value=value).replace(',', ' ')
br = self.boundingRect()
h = br.height()
self.setPos(QPointF(
- 0,
+ 1,
abs_pos.y() - h / 2 - offset
))
self.update()
diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py
index 0caf0d17..a708a82e 100644
--- a/piker/ui/_chart.py
+++ b/piker/ui/_chart.py
@@ -17,7 +17,7 @@
"""
High level Qt chart widgets.
"""
-from typing import Tuple, Dict, Any, Optional
+from typing import Tuple, Dict, Any, Optional, Callable
from functools import partial
from PyQt5 import QtCore, QtGui
@@ -105,6 +105,7 @@ class ChartSpace(QtGui.QWidget):
self.tf_layout.setContentsMargins(0, 12, 0, 0)
time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
btn_prefix = 'TF'
+
for tf in time_frames:
btn_name = ''.join([btn_prefix, tf])
btn = QtGui.QPushButton(tf)
@@ -112,6 +113,7 @@ class ChartSpace(QtGui.QWidget):
btn.setEnabled(False)
setattr(self, btn_name, btn)
self.tf_layout.addWidget(btn)
+
self.toolbar_layout.addLayout(self.tf_layout)
# XXX: strat loader/saver that we don't need yet.
@@ -123,8 +125,11 @@ class ChartSpace(QtGui.QWidget):
self,
symbol: str,
data: np.ndarray,
+ ohlc: bool = True,
) -> None:
"""Load a new contract into the charting app.
+
+ Expects a ``numpy`` structured array containing all the ohlcv fields.
"""
# XXX: let's see if this causes mem problems
self.window.setWindowTitle(f'piker chart {symbol}')
@@ -147,7 +152,8 @@ class ChartSpace(QtGui.QWidget):
if not self.v_layout.isEmpty():
self.v_layout.removeWidget(linkedcharts)
- main_chart = linkedcharts.plot_main(s, data)
+ main_chart = linkedcharts.plot_ohlc_main(s, data)
+
self.v_layout.addWidget(linkedcharts)
return linkedcharts, main_chart
@@ -175,7 +181,6 @@ class LinkedSplitCharts(QtGui.QWidget):
def __init__(self):
super().__init__()
self.signals_visible: bool = False
- self._array: np.ndarray = None # main data source
self._ch: CrossHair = None # crosshair graphics
self.chart: ChartPlotWidget = None # main (ohlc) chart
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
@@ -184,11 +189,6 @@ class LinkedSplitCharts(QtGui.QWidget):
orientation='bottom',
linked_charts=self
)
- self.xaxis_ind = DynamicDateAxis(
- orientation='bottom',
- linked_charts=self
- )
-
# if _xaxis_at == 'bottom':
# self.xaxis.setStyle(showValues=False)
# self.xaxis.hide()
@@ -216,20 +216,18 @@ class LinkedSplitCharts(QtGui.QWidget):
sizes.extend([min_h_ind] * len(self.subplots))
self.splitter.setSizes(sizes) # , int(self.height()*0.2)
- def plot_main(
+ def plot_ohlc_main(
self,
symbol: Symbol,
array: np.ndarray,
- ohlc: bool = True,
+ style: str = 'bar',
) -> 'ChartPlotWidget':
"""Start up and show main (price) chart and all linked subcharts.
+
+ The data input struct array must include OHLC fields.
"""
self.digits = symbol.digits()
- # TODO: this should eventually be a view onto shared mem or some
- # higher level type / API
- self._array = array
-
# add crosshairs
self._ch = CrossHair(
linkedsplitcharts=self,
@@ -239,11 +237,13 @@ class LinkedSplitCharts(QtGui.QWidget):
name=symbol.key,
array=array,
xaxis=self.xaxis,
- ohlc=True,
+ style=style,
_is_main=True,
)
# add crosshair graphic
self.chart.addItem(self._ch)
+
+ # axis placement
if _xaxis_at == 'bottom':
self.chart.hideAxis('bottom')
@@ -257,7 +257,7 @@ class LinkedSplitCharts(QtGui.QWidget):
name: str,
array: np.ndarray,
xaxis: DynamicDateAxis = None,
- ohlc: bool = False,
+ style: str = 'line',
_is_main: bool = False,
**cpw_kwargs,
) -> 'ChartPlotWidget':
@@ -267,15 +267,25 @@ class LinkedSplitCharts(QtGui.QWidget):
"""
if self.chart is None and not _is_main:
raise RuntimeError(
- "A main plot must be created first with `.plot_main()`")
+ "A main plot must be created first with `.plot_ohlc_main()`")
# source of our custom interactions
cv = ChartView()
cv.linked_charts = self
# use "indicator axis" by default
- xaxis = self.xaxis_ind if xaxis is None else xaxis
+ if xaxis is None:
+ xaxis = DynamicDateAxis(
+ orientation='bottom',
+ linked_charts=self
+ )
+
cpw = ChartPlotWidget(
+
+ # this name will be used to register the primary
+ # graphics curve managed by the subchart
+ name=name,
+
array=array,
parent=self.splitter,
axisItems={
@@ -286,9 +296,12 @@ class LinkedSplitCharts(QtGui.QWidget):
cursor=self._ch,
**cpw_kwargs,
)
- # this name will be used to register the primary
- # graphics curve managed by the subchart
- cpw.name = name
+
+ # give viewbox a reference to primary chart
+ # allowing for kb controls and interactions
+ # (see our custom view in `._interactions.py`)
+ cv.chart = cpw
+
cpw.plotItem.vb.linked_charts = self
cpw.setFrameStyle(QtGui.QFrame.StyledPanel) # | QtGui.QFrame.Plain)
cpw.hideButtons()
@@ -302,11 +315,15 @@ class LinkedSplitCharts(QtGui.QWidget):
self._ch.add_plot(cpw)
# draw curve graphics
- if ohlc:
+ if style == 'bar':
cpw.draw_ohlc(name, array)
- else:
+
+ elif style == 'line':
cpw.draw_curve(name, array)
+ else:
+ raise ValueError(f"Chart style {style} is currently unsupported")
+
if not _is_main:
# track by name
self.subplots[name] = cpw
@@ -316,6 +333,8 @@ class LinkedSplitCharts(QtGui.QWidget):
# XXX: we need this right?
# self.splitter.addWidget(cpw)
+ else:
+ assert style == 'bar', 'main chart must be OHLC'
return cpw
@@ -341,6 +360,7 @@ class ChartPlotWidget(pg.PlotWidget):
def __init__(
self,
# the data view we generate graphics from
+ name: str,
array: np.ndarray,
static_yrange: Optional[Tuple[float, float]] = None,
cursor: Optional[CrossHair] = None,
@@ -353,16 +373,26 @@ class ChartPlotWidget(pg.PlotWidget):
# parent=None,
# plotItem=None,
# antialias=True,
+ useOpenGL=True,
**kwargs
)
+
+ self.name = name
+
# self.setViewportMargins(0, 0, 0, 0)
- self._array = array # readonly view of data
+ self._ohlc = array # readonly view of ohlc data
+ self.default_view()
+
+ self._arrays = {} # readonly view of overlays
self._graphics = {} # registry of underlying graphics
- self._overlays = {} # registry of overlay curves
+ self._overlays = set() # registry of overlay curve names
+
self._labels = {} # registry of underlying graphics
self._ysticks = {} # registry of underlying graphics
+
self._vb = self.plotItem.vb
self._static_yrange = static_yrange # for "known y-range style"
+
self._view_mode: str = 'follow'
self._cursor = cursor # placehold for mouse
@@ -373,6 +403,7 @@ class ChartPlotWidget(pg.PlotWidget):
# show background grid
self.showGrid(x=True, y=True, alpha=0.5)
+ # TODO: stick in config
# use cross-hair for cursor?
# self.setCursor(QtCore.Qt.CrossCursor)
@@ -387,14 +418,25 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.sigResized.connect(self._set_yrange)
def last_bar_in_view(self) -> bool:
- self._array[-1]['index']
-
- def update_contents_labels(self, index: int) -> None:
- if index >= 0 and index < len(self._array):
- array = self._array
+ self._ohlc[-1]['index']
+ def update_contents_labels(
+ self,
+ index: int,
+ # array_name: str,
+ ) -> None:
+ if index >= 0 and index < self._ohlc[-1]['index']:
for name, (label, update) in self._labels.items():
- update(index, array)
+
+ if name is self.name:
+ array = self._ohlc
+ else:
+ array = self._arrays[name]
+
+ try:
+ update(index, array)
+ except IndexError:
+ log.exception(f"Failed to update label: {name}")
def _set_xlimits(
self,
@@ -418,8 +460,11 @@ class ChartPlotWidget(pg.PlotWidget):
"""Return a range tuple for the bars present in view.
"""
l, r = self.view_range()
- lbar = max(l, 0)
- rbar = min(r, len(self._array))
+ a = self._ohlc
+ lbar = max(l, a[0]['index'])
+ rbar = min(r, a[-1]['index'])
+ # lbar = max(l, 0)
+ # rbar = min(r, len(self._ohlc))
return l, lbar, rbar, r
def default_view(
@@ -429,7 +474,8 @@ class ChartPlotWidget(pg.PlotWidget):
"""Set the view box to the "default" startup view of the scene.
"""
- xlast = self._array[index]['index']
+ xlast = self._ohlc[index]['index']
+ print(xlast)
begin = xlast - _bars_to_left_in_follow_mode
end = xlast + _bars_from_right_in_follow_mode
@@ -450,7 +496,7 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.setXRange(
min=l + 1,
max=r + 1,
- # holy shit, wtf dude... why tf would this not be 0 by
+ # TODO: holy shit, wtf dude... why tf would this not be 0 by
# default... speechless.
padding=0,
)
@@ -465,6 +511,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Draw OHLC datums to chart.
"""
graphics = style(self.plotItem)
+
# adds all bar/candle graphics objects for each data point in
# the np array buffer to be drawn on next render cycle
self.addItem(graphics)
@@ -474,9 +521,11 @@ class ChartPlotWidget(pg.PlotWidget):
self._graphics[name] = graphics
- label = ContentsLabel(chart=self, anchor_at=('top', 'left'))
- self._labels[name] = (label, partial(label.update_from_ohlc, name))
- label.show()
+ self.add_contents_label(
+ name,
+ anchor_at=('top', 'left'),
+ update_func=ContentsLabel.update_from_ohlc,
+ )
self.update_contents_labels(len(data) - 1)
self._add_sticky(name)
@@ -488,48 +537,74 @@ class ChartPlotWidget(pg.PlotWidget):
name: str,
data: np.ndarray,
overlay: bool = False,
+ color: str = 'default_light',
+ add_label: bool = True,
**pdi_kwargs,
) -> pg.PlotDataItem:
- # draw the indicator as a plain curve
+ """Draw a "curve" (line plot graphics) for the provided data in
+ the input array ``data``.
+
+ """
_pdi_defaults = {
- 'pen': pg.mkPen(hcolor('default_light')),
+ 'pen': pg.mkPen(hcolor(color)),
}
pdi_kwargs.update(_pdi_defaults)
curve = pg.PlotDataItem(
- data[name],
+ y=data[name],
+ x=data['index'],
# antialias=True,
name=name,
+
# TODO: see how this handles with custom ohlcv bars graphics
+ # and/or if we can implement something similar for OHLC graphics
clipToView=True,
+
**pdi_kwargs,
)
self.addItem(curve)
- # register overlay curve with name
+ # register curve graphics and backing array for name
self._graphics[name] = curve
+ self._arrays[name] = data
if overlay:
- anchor_at = ('bottom', 'right')
- self._overlays[name] = curve
+ anchor_at = ('bottom', 'left')
+ self._overlays.add(name)
else:
- anchor_at = ('top', 'right')
+ anchor_at = ('top', 'left')
# TODO: something instead of stickies for overlays
# (we need something that avoids clutter on x-axis).
self._add_sticky(name, bg_color='default_light')
- label = ContentsLabel(chart=self, anchor_at=anchor_at)
- self._labels[name] = (label, partial(label.update_from_value, name))
- label.show()
- self.update_contents_labels(len(data) - 1)
+ if add_label:
+ self.add_contents_label(name, anchor_at=anchor_at)
+ self.update_contents_labels(len(data) - 1)
if self._cursor:
self._cursor.add_curve_cursor(self, curve)
return curve
+ def add_contents_label(
+ self,
+ name: str,
+ anchor_at: Tuple[str, str] = ('top', 'left'),
+ update_func: Callable = ContentsLabel.update_from_value,
+ ) -> ContentsLabel:
+
+ label = ContentsLabel(chart=self, anchor_at=anchor_at)
+ self._labels[name] = (
+ # calls class method on instance
+ label,
+ partial(update_func, label, name)
+ )
+ label.show()
+
+ return label
+
def _add_sticky(
self,
name: str,
@@ -556,9 +631,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Update the named internal graphics from ``array``.
"""
- if name not in self._overlays:
- self._array = array
-
+ self._ohlc = array
graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs)
return graphics
@@ -573,12 +646,18 @@ class ChartPlotWidget(pg.PlotWidget):
"""
if name not in self._overlays:
- self._array = array
+ self._ohlc = array
+ else:
+ self._arrays[name] = array
curve = self._graphics[name]
+
# TODO: we should instead implement a diff based
- # "only update with new items" on the pg.PlotDataItem
- curve.setData(array[name], **kwargs)
+ # "only update with new items" on the pg.PlotCurveItem
+ # one place to dig around this might be the `QBackingStore`
+ # https://doc.qt.io/qt-5/qbackingstore.html
+ curve.setData(y=array[name], x=array['index'], **kwargs)
+
return curve
def _set_yrange(
@@ -612,8 +691,9 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: logic to check if end of bars in view
extra = view_len - _min_points_to_show
- begin = 0 - extra
- end = len(self._array) - 1 + extra
+ begin = self._ohlc[0]['index'] - extra
+ # end = len(self._ohlc) - 1 + extra
+ end = self._ohlc[-1]['index'] - 1 + extra
# XXX: test code for only rendering lines for the bars in view.
# This turns out to be very very poor perf when scaling out to
@@ -629,10 +709,15 @@ class ChartPlotWidget(pg.PlotWidget):
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
- self._set_xlimits(begin, end)
+ # self._set_xlimits(begin, end)
# TODO: this should be some kind of numpy view api
- bars = self._array[lbar:rbar]
+ # bars = self._ohlc[lbar:rbar]
+
+ a = self._ohlc
+ ifirst = a[0]['index']
+ bars = a[lbar - ifirst:rbar - ifirst]
+
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
@@ -644,40 +729,43 @@ class ChartPlotWidget(pg.PlotWidget):
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
except (IndexError, ValueError):
- # must be non-ohlc array?
+ # likely non-ohlc array?
+ bars = bars[self.name]
ylow = np.nanmin(bars)
yhigh = np.nanmax(bars)
# view margins: stay within a % of the "true range"
diff = yhigh - ylow
ylow = ylow - (diff * 0.04)
- # yhigh = yhigh + (diff * 0.01)
+ yhigh = yhigh + (diff * 0.04)
- # compute contents label "height" in view terms
- # to avoid having data "contents" overlap with them
- if self._labels:
- label = self._labels[self.name][0]
+ # # compute contents label "height" in view terms
+ # # to avoid having data "contents" overlap with them
+ # if self._labels:
+ # label = self._labels[self.name][0]
- rect = label.itemRect()
- tl, br = rect.topLeft(), rect.bottomRight()
- vb = self.plotItem.vb
+ # rect = label.itemRect()
+ # tl, br = rect.topLeft(), rect.bottomRight()
+ # vb = self.plotItem.vb
- try:
- # on startup labels might not yet be rendered
- top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
+ # try:
+ # # on startup labels might not yet be rendered
+ # top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
- # XXX: magic hack, how do we compute exactly?
- label_h = (top - bottom) * 0.42
+ # # XXX: magic hack, how do we compute exactly?
+ # label_h = (top - bottom) * 0.42
- except np.linalg.LinAlgError:
- label_h = 0
- else:
- label_h = 0
+ # except np.linalg.LinAlgError:
+ # label_h = 0
+ # else:
+ # label_h = 0
- # print(f'label height {self.name}: {label_h}')
+ # # print(f'label height {self.name}: {label_h}')
- if label_h > yhigh - ylow:
- label_h = 0
+ # if label_h > yhigh - ylow:
+ # label_h = 0
+ # print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
+ label_h = 0
self.setLimits(
yMin=ylow,
@@ -715,13 +803,6 @@ async def _async_main(
# chart_app.init_search()
- # from ._exec import get_screen
- # screen = get_screen(chart_app.geometry().bottomRight())
-
- # XXX: bug zone if you try to ctl-c after this we get hangs again?
- # wtf...
- # await tractor.breakpoint()
-
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
@@ -735,46 +816,62 @@ async def _async_main(
bars = ohlcv.array
# load in symbol's ohlc data
+ # await tractor.breakpoint()
linked_charts, chart = chart_app.load_symbol(sym, bars)
# plot historical vwap if available
- vwap_in_history = False
- if 'vwap' in bars.dtype.fields:
- vwap_in_history = True
- chart.draw_curve(
- name='vwap',
- data=bars,
- overlay=True,
- )
+ wap_in_history = False
+
+ if brokermod._show_wap_in_history:
+
+ if 'bar_wap' in bars.dtype.fields:
+ wap_in_history = True
+ chart.draw_curve(
+ name='bar_wap',
+ data=bars,
+ add_label=False,
+ )
chart._set_yrange()
+ # TODO: a data view api that makes this less shit
+ chart._shm = ohlcv
+
+ # eventually we'll support some kind of n-compose syntax
+ fsp_conf = {
+ # 'vwap': {
+ # 'overlay': True,
+ # 'anchor': 'session',
+ # },
+ 'rsi': {
+ 'period': 14,
+ 'chart_kwargs': {
+ 'static_yrange': (0, 100),
+ },
+ },
+
+ }
+
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
- chart_from_fsp,
+ spawn_fsps,
linked_charts,
- 'rsi', # eventually will be n-compose syntax
+ fsp_conf,
sym,
ohlcv,
brokermod,
loglevel,
)
- # update last price sticky
- last_price_sticky = chart._ysticks[chart.name]
- last_price_sticky.update_from_data(
- *ohlcv.array[-1][['index', 'close']]
- )
-
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
- vwap_in_history,
+ wap_in_history,
)
# wait for a first quote before we start any update tasks
@@ -797,9 +894,10 @@ async def chart_from_quotes(
chart: ChartPlotWidget,
stream,
ohlcv: np.ndarray,
- vwap_in_history: bool = False,
+ wap_in_history: bool = False,
) -> None:
"""The 'main' (price) chart real-time update loop.
+
"""
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
@@ -809,24 +907,40 @@ async def chart_from_quotes(
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
+
+ # update last price sticky
last_price_sticky = chart._ysticks[chart.name]
+ last_price_sticky.update_from_data(
+ *ohlcv.array[-1][['index', 'close']]
+ )
def maxmin():
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
- array = chart._array
+ array = chart._ohlc
+ ifirst = array[0]['index']
+
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
- in_view = array[lbar:rbar]
+ in_view = array[lbar - ifirst:rbar - ifirst]
+
+ assert in_view.size
+
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
+
+ # TODO: when we start using line charts, probably want to make
+ # this an overloaded call on our `DataView
+ # sym = chart.name
+ # mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
+
return last_bars_range, mx, mn
- last_bars_range, last_mx, last_mn = maxmin()
-
chart.default_view()
+ last_bars_range, last_mx, last_mn = maxmin()
+
last, volume = ohlcv.array[-1][['close', 'volume']]
l1 = L1Labels(
@@ -836,9 +950,16 @@ async def chart_from_quotes(
size_digits=min(float_digits(volume), 3)
)
+ # TODO:
+ # - in theory we should be able to read buffer data faster
+ # then msgs arrive.. needs some tinkering and testing
+
+ # - if trade volume jumps above / below prior L1 price
+ # levels this might be dark volume we need to
+ # present differently?
+
async for quotes in stream:
for sym, quote in quotes.items():
- # print(f'CHART: {quote}')
for tick in quote.get('ticks', ()):
@@ -847,7 +968,14 @@ async def chart_from_quotes(
price = tick.get('price')
size = tick.get('size')
- if ticktype in ('trade', 'utrade'):
+ # compute max and min trade values to display in view
+ # TODO: we need a streaming minmax algorithm here, see
+ # def above.
+ brange, mx_in_view, mn_in_view = maxmin()
+ l, lbar, rbar, r = brange
+
+ if ticktype in ('trade', 'utrade', 'last'):
+
array = ohlcv.array
# update price sticky(s)
@@ -856,30 +984,16 @@ async def chart_from_quotes(
*last[['index', 'close']]
)
+ # plot bars
# update price bar
chart.update_ohlc_from_array(
chart.name,
array,
)
- if vwap_in_history:
+ if wap_in_history:
# update vwap overlay line
- chart.update_curve_from_array('vwap', ohlcv.array)
-
- # TODO:
- # - eventually we'll want to update bid/ask labels
- # and other data as subscribed by underlying UI
- # consumers.
- # - in theory we should be able to read buffer data faster
- # then msgs arrive.. needs some tinkering and testing
-
- # if trade volume jumps above / below prior L1 price
- # levels adjust bid / ask lines to match
-
- # compute max and min trade values to display in view
- # TODO: we need a streaming minmax algorithm here, see
- # def above.
- brange, mx_in_view, mn_in_view = maxmin()
+ chart.update_curve_from_array('bar_wap', ohlcv.array)
# XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'):
@@ -910,7 +1024,7 @@ async def chart_from_quotes(
l1.bid_label.update_from_data(0, price)
# update min price in view to keep bid on screen
- mn_in_view = max(price, mn_in_view)
+ mn_in_view = min(price, mn_in_view)
if mx_in_view > last_mx or mn_in_view < last_mn:
chart._set_yrange(yrange=(mn_in_view, mx_in_view))
@@ -923,9 +1037,10 @@ async def chart_from_quotes(
last_bars_range = brange
-async def chart_from_fsp(
- linked_charts,
- fsp_func_name,
+async def spawn_fsps(
+ linked_charts: LinkedSplitCharts,
+ # fsp_func_name,
+ fsps: Dict[str, str],
sym,
src_shm,
brokermod,
@@ -934,52 +1049,124 @@ async def chart_from_fsp(
"""Start financial signal processing in subactor.
Pass target entrypoint and historical data.
+
"""
- name = f'fsp.{fsp_func_name}'
-
- # TODO: load function here and introspect
- # return stream type(s)
-
- # TODO: should `index` be a required internal field?
- fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
-
+ # spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery() as n:
- key = f'{sym}.' + name
- shm, opened = maybe_open_shm_array(
- key,
- # TODO: create entry for each time frame
- dtype=fsp_dtype,
- readonly=True,
+ # spawns local task that consume and chart data streams from
+ # sub-procs
+ async with trio.open_nursery() as ln:
+
+ # Currently we spawn an actor per fsp chain but
+ # likely we'll want to pool them eventually to
+ # scale horizonatlly once cores are used up.
+ for fsp_func_name, conf in fsps.items():
+
+ display_name = f'fsp.{fsp_func_name}'
+
+ # TODO: load function here and introspect
+ # return stream type(s)
+
+ # TODO: should `index` be a required internal field?
+ fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
+
+ key = f'{sym}.' + display_name
+
+ # this is all sync currently
+ shm, opened = maybe_open_shm_array(
+ key,
+ # TODO: create entry for each time frame
+ dtype=fsp_dtype,
+ readonly=True,
+ )
+
+ # XXX: fsp may have been opened by a duplicate chart. Error for
+ # now until we figure out how to wrap fsps as "feeds".
+ assert opened, f"A chart for {key} likely already exists?"
+
+ conf['shm'] = shm
+
+ # spawn closure, can probably define elsewhere
+ async def spawn_fsp_daemon(
+ fsp_name: str,
+ display_name: str,
+ conf: dict,
+ ):
+ """Start an fsp subactor async.
+
+ """
+ print(f'FSP NAME: {fsp_name}')
+ portal = await n.run_in_actor(
+
+ # name as title of sub-chart
+ display_name,
+
+ # subactor entrypoint
+ fsp.cascade,
+ brokername=brokermod.name,
+ src_shm_token=src_shm.token,
+ dst_shm_token=conf['shm'].token,
+ symbol=sym,
+ fsp_func_name=fsp_name,
+
+ # tractor config
+ loglevel=loglevel,
+ )
+
+ stream = await portal.result()
+
+ # receive last index for processed historical
+ # data-array as first msg
+ _ = await stream.receive()
+
+ conf['stream'] = stream
+ conf['portal'] = portal
+
+ # new local task
+ ln.start_soon(
+ spawn_fsp_daemon,
+ fsp_func_name,
+ display_name,
+ conf,
+ )
+
+ # blocks here until all daemons up
+
+ # start and block on update loops
+ async with trio.open_nursery() as ln:
+ for fsp_func_name, conf in fsps.items():
+ ln.start_soon(
+ update_signals,
+ linked_charts,
+ fsp_func_name,
+ conf,
+ )
+
+
+async def update_signals(
+ linked_charts: LinkedSplitCharts,
+ fsp_func_name: str,
+ conf: Dict[str, Any],
+
+) -> None:
+ """FSP stream chart update loop.
+
+ This is called once for each entry in the fsp
+ config map.
+ """
+ shm = conf['shm']
+
+ if conf.get('overlay'):
+ chart = linked_charts.chart
+ chart.draw_curve(
+ name='vwap',
+ data=shm.array,
+ overlay=True,
)
+ last_val_sticky = None
- # XXX: fsp may have been opened by a duplicate chart. Error for
- # now until we figure out how to wrap fsps as "feeds".
- assert opened, f"A chart for {key} likely already exists?"
-
- # start fsp sub-actor
- portal = await n.run_in_actor(
-
- # name as title of sub-chart
- name,
-
- # subactor entrypoint
- fsp.cascade,
- brokername=brokermod.name,
- src_shm_token=src_shm.token,
- dst_shm_token=shm.token,
- symbol=sym,
- fsp_func_name=fsp_func_name,
-
- # tractor config
- loglevel=loglevel,
- )
-
- stream = await portal.result()
-
- # receive last index for processed historical
- # data-array as first msg
- _ = await stream.receive()
+ else:
chart = linked_charts.add_plot(
name=fsp_func_name,
@@ -989,12 +1176,17 @@ async def chart_from_fsp(
ohlc=False,
# settings passed down to ``ChartPlotWidget``
- static_yrange=(0, 100),
+ **conf.get('chart_kwargs', {})
+ # static_yrange=(0, 100),
)
# display contents labels asap
- chart.update_contents_labels(len(shm.array) - 1)
+ chart.update_contents_labels(
+ len(shm.array) - 1,
+ # fsp_func_name
+ )
+ # read last value
array = shm.array
value = array[fsp_func_name][-1]
@@ -1002,33 +1194,42 @@ async def chart_from_fsp(
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array)
- chart.default_view()
-
- # TODO: figure out if we can roll our own `FillToThreshold` to
- # get brush filled polygons for OS/OB conditions.
- # ``pg.FillBetweenItems`` seems to be one technique using
- # generic fills between curve types while ``PlotCurveItem`` has
- # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
- # might be the best solution?
- # graphics = chart.update_from_array(chart.name, array[fsp_func_name])
- # graphics.curve.setBrush(50, 50, 200, 100)
- # graphics.curve.setFillLevel(50)
-
- # add moveable over-[sold/bought] lines
- level_line(chart, 30)
- level_line(chart, 70, orient_v='top')
chart._shm = shm
- chart._set_yrange()
- # update chart graphics
- async for value in stream:
- # p = pg.debug.Profiler(disabled=False, delayed=False)
- array = shm.array
- value = array[-1][fsp_func_name]
+ # TODO: figure out if we can roll our own `FillToThreshold` to
+ # get brush filled polygons for OS/OB conditions.
+ # ``pg.FillBetweenItems`` seems to be one technique using
+ # generic fills between curve types while ``PlotCurveItem`` has
+ # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which
+ # might be the best solution?
+ # graphics = chart.update_from_array(chart.name, array[fsp_func_name])
+ # graphics.curve.setBrush(50, 50, 200, 100)
+ # graphics.curve.setFillLevel(50)
+
+ # add moveable over-[sold/bought] lines
+ # and labels only for the 70/30 lines
+ level_line(chart, 20, show_label=False)
+ level_line(chart, 30, orient_v='top')
+ level_line(chart, 70, orient_v='bottom')
+ level_line(chart, 80, orient_v='top', show_label=False)
+
+ chart._set_yrange()
+
+ stream = conf['stream']
+
+ # update chart graphics
+ async for value in stream:
+
+ # read last
+ array = shm.array
+ value = array[-1][fsp_func_name]
+
+ if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
- chart.update_curve_from_array(fsp_func_name, array)
- # p('rendered rsi datum')
+
+ # update graphics
+ chart.update_curve_from_array(fsp_func_name, array)
async def check_for_new_bars(feed, ohlcv, linked_charts):
@@ -1079,18 +1280,24 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
# resize view
# price_chart._set_yrange()
- for name, curve in price_chart._overlays.items():
+ for name in price_chart._overlays:
- # TODO: standard api for signal lookups per plot
- if name in price_chart._array.dtype.fields:
+ price_chart.update_curve_from_array(
+ name,
+ price_chart._arrays[name]
+ )
- # should have already been incremented above
- price_chart.update_curve_from_array(name, price_chart._array)
+ # # TODO: standard api for signal lookups per plot
+ # if name in price_chart._ohlc.dtype.fields:
+
+ # # should have already been incremented above
+ # price_chart.update_curve_from_array(name, price_chart._ohlc)
for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array)
# chart._set_yrange()
+ # shift the view if in follow mode
price_chart.increment_view()
diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py
index 732db3e2..bbb3633a 100644
--- a/piker/ui/_exec.py
+++ b/piker/ui/_exec.py
@@ -20,12 +20,15 @@ Trio - Qt integration
Run ``trio`` in guest mode on top of the Qt event loop.
All global Qt runtime settings are mostly defined here.
"""
+import os
+import signal
from functools import partial
import traceback
from typing import Tuple, Callable, Dict, Any
# Qt specific
import PyQt5 # noqa
+import pyqtgraph as pg
from pyqtgraph import QtGui
from PyQt5 import QtCore
from PyQt5.QtCore import (
@@ -37,6 +40,12 @@ import tractor
from outcome import Error
+# pyqtgraph global config
+# might as well enable this for now?
+pg.useOpenGL = True
+pg.enableExperimental = True
+
+
# singleton app per actor
_qt_app: QtGui.QApplication = None
_qt_win: QtGui.QMainWindow = None
@@ -67,6 +76,17 @@ class MainWindow(QtGui.QMainWindow):
self.setMinimumSize(*self.size)
self.setWindowTitle(self.title)
+ def closeEvent(
+ self,
+ event: 'QCloseEvent'
+ ) -> None:
+ """Cancel the root actor asap.
+
+ """
+ # raising KBI seems to get intercepted by by Qt so just use the
+ # system.
+ os.kill(os.getpid(), signal.SIGINT)
+
def run_qtractor(
func: Callable,
@@ -115,11 +135,15 @@ def run_qtractor(
def done_callback(outcome):
- print(f"Outcome: {outcome}")
-
if isinstance(outcome, Error):
exc = outcome.error
- traceback.print_exception(type(exc), exc, exc.__traceback__)
+
+ if isinstance(outcome.error, KeyboardInterrupt):
+ # make it kinda look like ``trio``
+ print("Terminated!")
+
+ else:
+ traceback.print_exception(type(exc), exc, exc.__traceback__)
app.quit()
@@ -154,6 +178,7 @@ def run_qtractor(
main,
run_sync_soon_threadsafe=run_sync_soon_threadsafe,
done_callback=done_callback,
+ # restrict_keyboard_interrupt_to_checkpoints=True,
)
window.main_widget = main_widget
diff --git a/piker/ui/_graphics.py b/piker/ui/_graphics.py
index 88193a72..6e968060 100644
--- a/piker/ui/_graphics.py
+++ b/piker/ui/_graphics.py
@@ -17,39 +17,44 @@
"""
Chart graphics for displaying a slew of different data types.
"""
-
-# import time
+import inspect
from typing import List, Optional, Tuple
import numpy as np
import pyqtgraph as pg
-# from numba import jit, float64, optional, int64
+from numba import jit, float64, int64 # , optional
+# from numba import types as ntypes
from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import QLineF, QPointF
-# from .._profile import timeit
+from .._profile import timeit
+# from ..data._source import numba_ohlc_dtype
from ._style import (
_xaxis_at,
hcolor,
_font,
+ _down_2_font_inches_we_like,
)
from ._axes import YAxisLabel, XAxisLabel, YSticky
# XXX: these settings seem to result in really decent mouse scroll
# latency (in terms of perceived lag in cross hair) so really be sure
-# there's an improvement if you want to change it.
-_mouse_rate_limit = 60 # calc current screen refresh rate?
+# there's an improvement if you want to change it!
+_mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 2e3
_ch_label_opac = 1
+# TODO: we need to handle the case where index is outside
+# the underlying datums range
class LineDot(pg.CurvePoint):
def __init__(
self,
curve: pg.PlotCurveItem,
index: int,
+ plot: 'ChartPlotWidget',
pos=None,
size: int = 2, # in pxs
color: str = 'default_light',
@@ -61,6 +66,7 @@ class LineDot(pg.CurvePoint):
pos=pos,
rotate=False,
)
+ self._plot = plot
# TODO: get pen from curve if not defined?
cdefault = hcolor(color)
@@ -80,6 +86,31 @@ class LineDot(pg.CurvePoint):
# keep a static size
self.setFlag(self.ItemIgnoresTransformations)
+ def event(
+ self,
+ ev: QtCore.QEvent,
+ ) -> None:
+ # print((ev, type(ev)))
+ if not isinstance(ev, QtCore.QDynamicPropertyChangeEvent) or self.curve() is None:
+ return False
+
+ # if ev.propertyName() == 'index':
+ # print(ev)
+ # # self.setProperty
+
+ (x, y) = self.curve().getData()
+ index = self.property('index')
+ # first = self._plot._ohlc[0]['index']
+ # first = x[0]
+ # i = index - first
+ i = index - x[0]
+ if i > 0 and i < len(y):
+ newPos = (index, y[i])
+ QtGui.QGraphicsItem.setPos(self, *newPos)
+ return True
+
+ return False
+
_corner_anchors = {
'top': 0,
@@ -91,8 +122,9 @@ _corner_anchors = {
_corner_margins = {
('top', 'left'): (-4, -5),
('top', 'right'): (4, -5),
- ('bottom', 'left'): (-4, 5),
- ('bottom', 'right'): (4, 5),
+
+ ('bottom', 'left'): (-4, lambda font_size: font_size * 2),
+ ('bottom', 'right'): (4, lambda font_size: font_size * 2),
}
@@ -109,7 +141,10 @@ class ContentsLabel(pg.LabelItem):
font_size: Optional[int] = None,
) -> None:
font_size = font_size or _font.font.pixelSize()
- super().__init__(justify=justify_text, size=f'{str(font_size)}px')
+ super().__init__(
+ justify=justify_text,
+ size=f'{str(font_size)}px'
+ )
# anchor to viewbox
self.setParentItem(chart._vb)
@@ -120,6 +155,10 @@ class ContentsLabel(pg.LabelItem):
index = (_corner_anchors[h], _corner_anchors[v])
margins = _corner_margins[(v, h)]
+ ydim = margins[1]
+ if inspect.isfunction(margins[1]):
+ margins = margins[0], ydim(font_size)
+
self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc(
@@ -129,15 +168,19 @@ class ContentsLabel(pg.LabelItem):
array: np.ndarray,
) -> None:
# this being "html" is the dumbest shit :eyeroll:
+ first = array[0]['index']
+
self.setText(
"i:{index}
"
"O:{}
"
"H:{}
"
"L:{}
"
"C:{}
"
- "V:{}".format(
- # *self._array[index].item()[2:8],
- *array[index].item()[2:8],
+ "V:{}
"
+ "wap:{}".format(
+ *array[index - first][
+ ['open', 'high', 'low', 'close', 'volume', 'bar_wap']
+ ],
name=name,
index=index,
)
@@ -149,8 +192,10 @@ class ContentsLabel(pg.LabelItem):
index: int,
array: np.ndarray,
) -> None:
- data = array[index][name]
- self.setText(f"{name}: {data:.2f}")
+ first = array[0]['index']
+ if index < array[-1]['index'] and index > first:
+ data = array[index - first][name]
+ self.setText(f"{name}: {data:.2f}")
class CrossHair(pg.GraphicsObject):
@@ -246,7 +291,7 @@ class CrossHair(pg.GraphicsObject):
) -> LineDot:
# if this plot contains curves add line dot "cursors" to denote
# the current sample under the mouse
- cursor = LineDot(curve, index=len(plot._array))
+ cursor = LineDot(curve, index=plot._ohlc[-1]['index'], plot=plot)
plot.addItem(cursor)
self.graphics[plot].setdefault('cursors', []).append(cursor)
return cursor
@@ -308,8 +353,9 @@ class CrossHair(pg.GraphicsObject):
plot.update_contents_labels(ix)
# update all subscribed curve dots
+ # first = plot._ohlc[0]['index']
for cursor in opts.get('cursors', ()):
- cursor.setIndex(ix)
+ cursor.setIndex(ix) # - first)
# update the label on the bottom of the crosshair
self.xaxis_label.update_label(
@@ -332,96 +378,127 @@ class CrossHair(pg.GraphicsObject):
return self.plots[0].boundingRect()
-# @jit(
-# # float64[:](
-# # float64[:],
-# # optional(float64),
-# # optional(int16)
-# # ),
-# nopython=True,
-# nogil=True
-# )
-def _mk_lines_array(data: List, size: int) -> np.ndarray:
- """Create an ndarray to hold lines graphics objects.
+def _mk_lines_array(
+ data: List,
+ size: int,
+ elements_step: int = 6,
+) -> np.ndarray:
+ """Create an ndarray to hold lines graphics info.
+
"""
return np.zeros_like(
data,
- shape=(int(size), 3),
+ shape=(int(size), elements_step),
dtype=object,
)
-# TODO: `numba` this?
+def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
+ open, high, low, close, index = row[
+ ['open', 'high', 'low', 'close', 'index']]
-# @jit(
-# # float64[:](
-# # float64[:],
-# # optional(float64),
-# # optional(int16)
-# # ),
-# nopython=True,
-# nogil=True
-# )
-def bars_from_ohlc(
+ # high -> low vertical (body) line
+ if low != high:
+ hl = QLineF(index, low, index, high)
+ else:
+ # XXX: if we don't do it renders a weird rectangle?
+ # see below for filtering this later...
+ hl = None
+
+ # NOTE: place the x-coord start as "middle" of the drawing range such
+ # that the open arm line-graphic is at the left-most-side of
+ # the index's range according to the view mapping.
+
+ # open line
+ o = QLineF(index - w, open, index, open)
+ # close line
+ c = QLineF(index, close, index + w, close)
+
+ return [hl, o, c]
+
+
+@jit(
+ # TODO: for now need to construct this manually for readonly arrays, see
+ # https://github.com/numba/numba/issues/4511
+ # ntypes.Tuple((float64[:], float64[:], float64[:]))(
+ # numba_ohlc_dtype[::1], # contiguous
+ # int64,
+ # optional(float64),
+ # ),
+ nopython=True,
+ nogil=True
+)
+def path_arrays_from_ohlc(
data: np.ndarray,
- w: float,
- start: int = 0,
+ start: int64,
+ bar_gap: float64 = 0.43,
) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data.
"""
- lines = _mk_lines_array(data, data.shape[0])
+ size = int(data.shape[0] * 6)
- for i, q in enumerate(data[start:], start=start):
- open, high, low, close, index = q[
- ['open', 'high', 'low', 'close', 'index']]
+ x = np.zeros(
+ # data,
+ shape=size,
+ dtype=float64,
+ )
+ y, c = x.copy(), x.copy()
- # high -> low vertical (body) line
- if low != high:
- hl = QLineF(index, low, index, high)
- else:
- # XXX: if we don't do it renders a weird rectangle?
- # see below for filtering this later...
- hl = None
+ # TODO: report bug for assert @
+ # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
+ for i, q in enumerate(data[start:], start):
- # NOTE: place the x-coord start as "middle" of the drawing range such
- # that the open arm line-graphic is at the left-most-side of
- # the index's range according to the view mapping.
+ # TODO: ask numba why this doesn't work..
+ # open, high, low, close, index = q[
+ # ['open', 'high', 'low', 'close', 'index']]
- # open line
- o = QLineF(index - w, open, index, open)
- # close line
- c = QLineF(index, close, index + w, close)
+ open = q['open']
+ high = q['high']
+ low = q['low']
+ close = q['close']
+ index = float64(q['index'])
- # indexing here is as per the below comments
- lines[i] = (hl, o, c)
+ istart = i * 6
+ istop = istart + 6
- # XXX: in theory we could get a further speedup by using a flat
- # array and avoiding the call to `np.ravel()` below?
- # lines[3*i:3*i+3] = (hl, o, c)
+ # x,y detail the 6 points which connect all vertexes of a ohlc bar
+ x[istart:istop] = (
+ index - bar_gap,
+ index,
+ index,
+ index,
+ index,
+ index + bar_gap,
+ )
+ y[istart:istop] = (
+ open,
+ open,
+ low,
+ high,
+ close,
+ close,
+ )
- # XXX: legacy code from candles custom graphics:
- # if not _tina_mode:
- # else _tina_mode:
- # self.lines = lines = np.concatenate(
- # [high_to_low, open_sticks, close_sticks])
- # use traditional up/down green/red coloring
- # long_bars = np.resize(Quotes.close > Quotes.open, len(lines))
- # short_bars = np.resize(
- # Quotes.close < Quotes.open, len(lines))
+ # specifies that the first edge is never connected to the
+ # prior bars last edge thus providing a small "gap"/"space"
+ # between bars determined by ``bar_gap``.
+ c[istart:istop] = (0, 1, 1, 1, 1, 1)
- # ups = lines[long_bars]
- # downs = lines[short_bars]
+ return x, y, c
- # # draw "up" bars
- # p.setPen(self.bull_brush)
- # p.drawLines(*ups)
- # # draw "down" bars
- # p.setPen(self.bear_brush)
- # p.drawLines(*downs)
+# @timeit
+def gen_qpath(
+ data,
+ start, # XXX: do we need this?
+ w,
+) -> QtGui.QPainterPath:
- return lines
+ x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
+
+ # TODO: numba the internals of this!
+ return pg.functions.arrayToQPath(x, y, connect=c)
class BarItems(pg.GraphicsObject):
@@ -431,11 +508,10 @@ class BarItems(pg.GraphicsObject):
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43
- bars_pen = pg.mkPen(hcolor('bracket'))
- # XXX: tina mode, see below
- # bull_brush = pg.mkPen('#00cc00')
- # bear_brush = pg.mkPen('#fa0000')
+ # XXX: for the mega-lulz increasing width here increases draw latency...
+ # so probably don't do it until we figure that out.
+ bars_pen = pg.mkPen(hcolor('bracket'))
def __init__(
self,
@@ -443,95 +519,87 @@ class BarItems(pg.GraphicsObject):
plotitem: 'pg.PlotItem', # noqa
) -> None:
super().__init__()
- self.last = QtGui.QPicture()
- self.history = QtGui.QPicture()
- # TODO: implement updateable pixmap solution
+
+ self.last_bar = QtGui.QPicture()
+
+ self.path = QtGui.QPainterPath()
+ # self._h_path = QtGui.QGraphicsPathItem(self.path)
+
self._pi = plotitem
- # self._scene = plotitem.vb.scene()
- # self.picture = QtGui.QPixmap(1000, 300)
- # plotitem.addItem(self.picture)
- # self._pmi = None
- # self._pmi = self._scene.addPixmap(self.picture)
+
+ self._xrange: Tuple[int, int]
+ self._yrange: Tuple[float, float]
# XXX: not sure this actually needs to be an array other
# then for the old tina mode calcs for up/down bars below?
# lines container
- self.lines = _mk_lines_array([], 50e3)
+ # self.lines = _mk_lines_array([], 50e3, 6)
+
+ # TODO: don't render the full backing array each time
+ # self._path_data = None
+ self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None
# track the current length of drawable lines within the larger array
- self.index: int = 0
+ self.start_index: int = 0
+ self.stop_index: int = 0
# @timeit
def draw_from_data(
self,
data: np.ndarray,
start: int = 0,
- ):
+ ) -> QtGui.QPainterPath:
"""Draw OHLC datum graphics from a ``np.ndarray``.
This routine is usually only called to draw the initial history.
"""
- lines = bars_from_ohlc(data, self.w, start=start)
+ self.path = gen_qpath(data, start, self.w)
# save graphics for later reference and keep track
# of current internal "last index"
- index = len(lines)
- self.lines[:index] = lines
- self.index = index
+ # self.start_index = len(data)
+ index = data['index']
+ self._xrange = (index[0], index[-1])
+ self._yrange = (
+ np.nanmax(data['high']),
+ np.nanmin(data['low']),
+ )
# up to last to avoid double draw of last bar
- self.draw_lines(just_history=True, iend=self.index - 1)
- self.draw_lines(iend=self.index)
+ self._last_bar_lines = lines_from_ohlc(data[-1], self.w)
- # @timeit
- def draw_lines(
- self,
- istart=0,
- iend=None,
- just_history=False,
- # TODO: could get even fancier and only update the single close line?
- lines=None,
- ) -> None:
- """Draw the current line set using the painter.
- """
- if just_history:
- # draw bars for the "history" picture
- iend = iend or self.index - 1
- pic = self.history
- else:
- # draw the last bar
- istart = self.index - 1
- iend = iend or self.index
- pic = self.last
+ # create pics
+ # self.draw_history()
+ self.draw_last_bar()
- # use 2d array of lines objects, see conlusion on speed:
- # https://stackoverflow.com/a/60089929
- flat = np.ravel(self.lines[istart:iend])
-
- # TODO: do this with numba for speed gain:
- # https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach
- to_draw = flat[np.where(flat != None)] # noqa
-
- # pre-computing a QPicture object allows paint() to run much
- # more quickly, rather than re-drawing the shapes every time.
- p = QtGui.QPainter(pic)
- p.setPen(self.bars_pen)
-
- # TODO: is there any way to not have to pass all the lines every
- # iteration? It seems they won't draw unless it's done this way..
- p.drawLines(*to_draw)
- p.end()
-
- # XXX: if we ever try using `QPixmap` again...
- # if self._pmi is None:
- # self._pmi = self.scene().addPixmap(self.picture)
- # else:
- # self._pmi.setPixmap(self.picture)
-
- # trigger re-render
+ # trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update()
+ return self.path
+
+ # def update_ranges(
+ # self,
+ # xmn: int,
+ # xmx: int,
+ # ymn: float,
+ # ymx: float,
+ # ) -> None:
+ # ...
+
+
+ def draw_last_bar(self) -> None:
+ """Currently this draws lines to a cached ``QPicture`` which
+ is supposed to speed things up on ``.paint()`` calls (which
+ is a call to ``QPainter.drawPicture()`` but I'm not so sure.
+
+ """
+ p = QtGui.QPainter(self.last_bar)
+ p.setPen(self.bars_pen)
+ p.drawLines(*tuple(filter(bool, self._last_bar_lines)))
+ p.end()
+
+ # @timeit
def update_from_array(
self,
array: np.ndarray,
@@ -545,32 +613,65 @@ class BarItems(pg.GraphicsObject):
graphics object, and then update/rerender, but here we're
assuming the prior graphics havent changed (OHLC history rarely
does) so this "should" be simpler and faster.
+
+ This routine should be made (transitively) as fast as possible.
"""
- index = self.index
- length = len(array)
- extra = length - index
+ # index = self.start_index
+ istart, istop = self._xrange
- # start_bar_to_update = index - 100
+ index = array['index']
+ first_index, last_index = index[0], index[-1]
+
+ # length = len(array)
+ prepend_length = istart - first_index
+ append_length = last_index - istop
+
+ # TODO: allow mapping only a range of lines thus
+ # only drawing as many bars as exactly specified.
+
+ if prepend_length:
+
+ # new history was added and we need to render a new path
+ new_bars = array[:prepend_length]
+ prepend_path = gen_qpath(new_bars, 0, self.w)
+
+ # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
+ # y value not matching the first value from
+ # array[prepend_length + 1] ???
+
+ # update path
+ old_path = self.path
+ self.path = prepend_path
+ self.path.addPath(old_path)
+
+ if append_length:
+ # generate new lines objects for updatable "current bar"
+ self._last_bar_lines = lines_from_ohlc(array[-1], self.w)
+ self.draw_last_bar()
- if extra > 0:
# generate new graphics to match provided array
- new = array[index:index + extra]
- lines = bars_from_ohlc(new, self.w)
- bars_added = len(lines)
- self.lines[index:index + bars_added] = lines
- self.index += bars_added
+ # path appending logic:
+ # we need to get the previous "current bar(s)" for the time step
+ # and convert it to a sub-path to append to the historical set
+ # new_bars = array[istop - 1:istop + append_length - 1]
+ new_bars = array[-append_length - 1:-1]
+ append_path = gen_qpath(new_bars, 0, self.w)
+ self.path.moveTo(float(istop - self.w), float(new_bars[0]['open']))
+ self.path.addPath(append_path)
- # start_bar_to_update = index - bars_added
- self.draw_lines(just_history=True)
- if just_history:
- return
+ self._xrange = first_index, last_index
- # current bar update
+ if just_history:
+ self.update()
+ return
+
+ # last bar update
i, o, h, l, last, v = array[-1][
['index', 'open', 'high', 'low', 'close', 'volume']
]
- assert i == self.index - 1
- body, larm, rarm = self.lines[i]
+ # assert i == self.start_index - 1
+ assert i == last_index
+ body, larm, rarm = self._last_bar_lines
# XXX: is there a faster way to modify this?
rarm.setLine(rarm.x1(), last, rarm.x2(), last)
@@ -579,16 +680,26 @@ class BarItems(pg.GraphicsObject):
if l != h: # noqa
if body is None:
- body = self.lines[index - 1][0] = QLineF(i, l, i, h)
+ body = self._last_bar_lines[0] = QLineF(i, l, i, h)
else:
# update body
body.setLine(i, l, i, h)
- else:
- # XXX: h == l -> remove any HL line to avoid render bug
- if body is not None:
- body = self.lines[index - 1][0] = None
- self.draw_lines(just_history=False)
+ # XXX: pretty sure this is causing an issue where the bar has
+ # a large upward move right before the next sample and the body
+ # is getting set to None since the next bar is flat but the shm
+ # array index update wasn't read by the time this code runs. Iow
+ # we're doing this removal of the body for a bar index that is
+ # now out of date / from some previous sample. It's weird
+ # though because i've seen it do this to bars i - 3 back?
+
+ # else:
+ # # XXX: h == l -> remove any HL line to avoid render bug
+ # if body is not None:
+ # body = self.lines[index - 1][0] = None
+
+ self.draw_last_bar()
+ self.update()
# @timeit
def paint(self, p, opt, widget):
@@ -606,33 +717,36 @@ class BarItems(pg.GraphicsObject):
# as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars
# in view.
- p.drawPicture(0, 0, self.history)
- p.drawPicture(0, 0, self.last)
+ p.drawPicture(0, 0, self.last_bar)
- # TODO: if we can ever make pixmaps work...
- # p.drawPixmap(0, 0, self.picture)
- # self._pmi.setPixmap(self.picture)
- # print(self.scene())
-
- # profiler('bars redraw:')
+ p.setPen(self.bars_pen)
+ p.drawPath(self.path)
+ # @timeit
def boundingRect(self):
- # TODO: can we do rect caching to make this faster?
-
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
+
+ # TODO: Can we do rect caching to make this faster
+ # like `pg.PlotCurveItem` does? In theory it's just
+ # computing max/min stuff again like we do in the udpate loop
+ # anyway. Not really sure it's necessary since profiling already
+ # shows this method is faf.
+
# boundingRect _must_ indicate the entire area that will be
# drawn on or else we will get artifacts and possibly crashing.
# (in this case, QPicture does all the work of computing the
# bounding rect for us).
# compute aggregate bounding rectangle
- lb = self.last.boundingRect()
- hb = self.history.boundingRect()
+ lb = self.last_bar.boundingRect()
+ hb = self.path.boundingRect()
+
return QtCore.QRectF(
# top left
QtCore.QPointF(hb.topLeft()),
# total size
- QtCore.QSizeF(lb.size() + hb.size())
+ QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size())
+ # QtCore.QSizeF(lb.size() + hb.size())
)
@@ -785,7 +899,7 @@ class L1Labels:
chart: 'ChartPlotWidget', # noqa
digits: int = 2,
size_digits: int = 0,
- font_size_inches: float = 4 / 53.,
+ font_size_inches: float = _down_2_font_inches_we_like,
) -> None:
self.chart = chart
@@ -839,7 +953,9 @@ def level_line(
digits: int = 1,
# size 4 font on 4k screen scaled down, so small-ish.
- font_size_inches: float = 4 / 53.,
+ font_size_inches: float = _down_2_font_inches_we_like,
+
+ show_label: bool = True,
**linelabelkwargs
) -> LevelLine:
@@ -859,6 +975,7 @@ def level_line(
**linelabelkwargs
)
label.update_from_data(0, level)
+
# TODO: can we somehow figure out a max value from the parent axis?
label._size_br_from_str(label.label_str)
@@ -874,4 +991,7 @@ def level_line(
chart.plotItem.addItem(line)
+ if not show_label:
+ label.hide()
+
return line
diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py
index fbd5694b..464fd80e 100644
--- a/piker/ui/_interaction.py
+++ b/piker/ui/_interaction.py
@@ -245,7 +245,7 @@ class ChartView(ViewBox):
log.debug("Max zoom bruh...")
return
- if ev.delta() < 0 and vl >= len(self.linked_charts._array) + 666:
+ if ev.delta() < 0 and vl >= len(self.linked_charts.chart._ohlc) + 666:
log.debug("Min zoom bruh...")
return
@@ -268,9 +268,9 @@ class ChartView(ViewBox):
# ).map(furthest_right_coord)
# )
- # This seems like the most "intuitive option, a hybrdid of
+ # This seems like the most "intuitive option, a hybrid of
# tws and tv styles
- last_bar = pg.Point(rbar)
+ last_bar = pg.Point(int(rbar))
self._resetTarget()
self.scaleBy(s, last_bar)
diff --git a/piker/ui/_style.py b/piker/ui/_style.py
index 23a3ac09..eeeb6c9c 100644
--- a/piker/ui/_style.py
+++ b/piker/ui/_style.py
@@ -18,6 +18,7 @@
Qt UI styling.
"""
from typing import Optional
+import math
import pyqtgraph as pg
from PyQt5 import QtCore, QtGui
@@ -27,10 +28,9 @@ from ..log import get_logger
log = get_logger(__name__)
-# chart-wide font
-# font size 6px / 53 dpi (3x scaled down on 4k hidpi)
-_default_font_inches_we_like = 6 / 53 # px / (px / inch) = inch
-_down_2_font_inches_we_like = 4 / 53
+# chart-wide fonts specified in inches
+_default_font_inches_we_like = 6 / 96
+_down_2_font_inches_we_like = 5 / 96
class DpiAwareFont:
@@ -66,8 +66,12 @@ class DpiAwareFont:
listed in the script in ``snippets/qt_screen_info.py``.
"""
- dpi = screen.physicalDotsPerInch()
- font_size = round(self._iwl * dpi)
+ # take the max since scaling can make things ugly in some cases
+ pdpi = screen.physicalDotsPerInch()
+ ldpi = screen.logicalDotsPerInch()
+ dpi = max(pdpi, ldpi)
+
+ font_size = math.floor(self._iwl * dpi)
log.info(
f"\nscreen:{screen.name()} with DPI: {dpi}"
f"\nbest font size is {font_size}\n"