Compare commits

...

32 Commits

Author SHA1 Message Date
Tyler Goodlet d01ca0bf96 Use .shield_channel() meth name from tractor 2020-12-17 10:53:30 -05:00
Tyler Goodlet 82c99c5fee Drop profile calls on OHLC bars for now 2020-12-17 10:53:30 -05:00
Tyler Goodlet cd0c75fe40 Add signal backfilling via trio task respawn 2020-12-17 10:53:30 -05:00
Tyler Goodlet 47959c6a2b Drop legacy "historical" QPicture cruft 2020-12-17 10:53:30 -05:00
Tyler Goodlet 873a8d3f3e More general salutation 2020-12-17 10:53:30 -05:00
Tyler Goodlet 2f36b58fbd Stick with slightly smaller fonts 2020-12-17 10:53:30 -05:00
Tyler Goodlet 642d38439d Fix axes for shm primary indexing 2020-12-17 10:53:30 -05:00
Tyler Goodlet f7f2857fe6 Port charting to new shm primary indexing 2020-12-17 10:53:30 -05:00
Tyler Goodlet 05a47c25f4 Close app on last window exit
Use a system triggered SIGINT on app close to tear down the streaming
stack and terminate the `trio`/`tractor` runtimes deterministically.
2020-12-17 10:53:30 -05:00
Tyler Goodlet 18d41d0d24 Port kraken to declare "wap" field 2020-12-17 10:53:30 -05:00
Tyler Goodlet 098db15b2d Port data apis to not touch primary index 2020-12-17 10:53:30 -05:00
Tyler Goodlet 6bae50ba2e Add historical backfilling to ib backend 2020-12-17 10:53:30 -05:00
Tyler Goodlet 3ee4fe7d56 First draft, make graphics work on shm primary index
This is a bit hacky (what with array indexing semantics being relative
to the primary index's "start" value but it works. We'll likely want
to somehow wrap this index finagling into an API soon.
2020-12-17 10:52:18 -05:00
Tyler Goodlet ec0be781f8 Add prepend support to shm system 2020-12-17 10:52:18 -05:00
Tyler Goodlet 3a70f4907a Left align yaxis label 2020-12-17 10:52:18 -05:00
Tyler Goodlet 74315d4c89 Font size tweaks for low dpi 2020-12-17 10:52:18 -05:00
Tyler Goodlet 1d7bd3f748 Use `numpy.divide()` to avoid divide-by-zero 2020-12-17 10:52:18 -05:00
Tyler Goodlet 2a933c3808 Tidy up doc string 2020-12-17 10:52:18 -05:00
Tyler Goodlet 9557292573 Attempt to add numba typing and use `QGraphicsPathItem`
Failed at using either.

Quirks in numba's typing require specifying readonly arrays by
composing types manually.

The graphics item path thing, while it does take less time to write on
bar appends, seems to be slower in general in calculating the
``.boundingRect()`` value. Likely we'll just add manual max/min tracking
on array updates like ``pg.PlotCurveItem`` to squeeze some final juices
on this.
2020-12-17 10:52:18 -05:00
Tyler Goodlet 2b4875957f Drop commented pixmap cruft
See #124 as to why we'll probably never need this.
2020-12-17 10:52:18 -05:00
Tyler Goodlet e36f675eee Get `QPainterPath` "append" working
Pertains further to #109.

Instead of redrawing the entire `QPainterPath` every time there's
a historical bars update just use `.addPath()` to slap in latest
history. It seems to work and is fast. This also seems like it will be
a great strategy for filling in earlier data, woot!
2020-12-17 10:52:18 -05:00
Tyler Goodlet bcce0b5df3 Draw bars using `QPainterPath` magic
This gives a massive speedup when viewing large bar sets (think a day's
worth of 5s bars) by using the `pg.functions.arrayToQPath()` "magic"
binary array writing that is also used in `PlotCurveItem`.  We're using
this same (lower level) function directly to draw bars as part of one
large path and it seems to be painting 15k (ish) bars with around 3ms
`.paint()` latency. The only thing still a bit slow is the path array
generation despite doing it with `numba`. Likely, either having multiple
paths or, only regenerating the missing backing array elements should
speed this up further to avoid slight delays when incrementing the bar
step.

This is of course a first draft and more cleanups are coming.
2020-12-17 10:52:18 -05:00
Tyler Goodlet 4fe8c4487a Add field diffing on failed push 2020-12-17 10:52:18 -05:00
Tyler Goodlet 757f02e6f7 Tweak axis text offset and margins 2020-12-17 10:52:18 -05:00
Tyler Goodlet 80845024da Use new global var stack from tractor 2020-12-17 10:52:18 -05:00
Tyler Goodlet a5f622506d Kill the tractor tree on window close.
This makes it so you don't have to ctrl-c kill apps.
Add in the experimental openGL support even though I'm pretty sure it's
not being used much for curve plotting (but could be wrong).
2020-12-17 10:52:18 -05:00
Tyler Goodlet c183a428ef Add commented ex. code for line price charts 2020-12-17 10:52:18 -05:00
Tyler Goodlet afa15c4616 Allocate space for 2d worth of 5s bars 2020-12-17 10:52:18 -05:00
Tyler Goodlet 4e8739d9ed Break hist calc into wap func, use hlc3. 2020-12-17 10:52:18 -05:00
Tyler Goodlet 0ab0957c6e Put fsp plotting into a couple tasks, startup speedups.
Break the chart update code for fsps into a new task (add a nursery) in
new `spawn_fsps` (was `chart_from_fsps`) that async requests actor
spawning and initial historical data (all CPU bound work).  For multiple
fsp subcharts this allows processing initial output in parallel
(multi-core). We might want to wrap this in a "feed" like api
eventually. Basically the fsp startup sequence is now:
- start all requested fsp actors in an async loop and wait for
  historical data to arrive
- loop through them all again to start update tasks which do chart
  graphics rendering

Add separate x-axis objects for each new subchart (required by
pyqtgraph); still need to fix hiding unnecessary ones.
Add a `ChartPlotWidget._arrays: dict` for holding overlay data distinct
from ohlc. Drop the sizing yrange to label heights for now since it's
pretty much all gone to hell since adding L1 labels. Fix y-stickies to
look up correct overly arrays.
2020-12-17 10:52:18 -05:00
Tyler Goodlet e76eb790f1 Add vwap to exposed fsp map 2020-12-17 10:52:18 -05:00
Tyler Goodlet 6d514a7d5a Add initial tina (ohl3) vwap fsp 2020-12-17 10:52:18 -05:00
15 changed files with 1378 additions and 592 deletions

View File

@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
from contextlib import asynccontextmanager, contextmanager
from contextlib import asynccontextmanager
from dataclasses import asdict
from functools import partial
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio
import logging
@ -32,6 +33,7 @@ import itertools
import time
from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker
import ib_insync as ibis
@ -45,7 +47,7 @@ from ..data import (
maybe_spawn_brokerd,
iterticks,
attach_shm_array,
get_shm_token,
# get_shm_token,
subscribe_ohlc_for_increment,
)
from ..data._source import from_df
@ -86,6 +88,8 @@ _time_frames = {
'Y': 'OneYear',
}
_show_wap_in_history = False
# overrides to sidestep pretty questionable design decisions in
# ``ib_insync``:
@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
_enters = 0
class Client:
"""IB wrapped for our broker backend API.
@ -142,32 +148,54 @@ class Client:
self.ib = ib
self.ib.RaiseRequestErrors = True
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def bars(
self,
symbol: str,
# EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m',
count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False,
start_dt: str = "1970-01-01T00:00:00.000000-05:00",
end_dt: str = "",
sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
"""
bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters}')
_enters += 1
contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime='',
# durationStr='60 S',
# durationStr='1 D',
endDateTime=end_dt,
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',
# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count),
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# time length calcs
durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs',
# always use extended hours
useRTH=False,
@ -181,9 +209,13 @@ class Client:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe:
df = ibis.util.df(bars)
return from_df(df)
return bars, from_df(df)
def onError(self, reqId, errorCode, errorString, contract) -> None:
breakpoint()
async def search_stocks(
self,
@ -237,6 +269,8 @@ class Client:
"""Get an unqualifed contract for the current "continous" future.
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)
# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId)
@ -279,10 +313,10 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD'
if exch in ('PURE',):
if exch in ('PURE', 'TSE'):
# stupid ib...
primaryExchange = exch
exch = 'SMART'
primaryExchange = 'PURE'
con = ibis.Stock(
symbol=sym,
@ -293,10 +327,27 @@ class Client:
try:
exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0]
head = await self.get_head_time(contract)
print(head)
except IndexError:
raise ValueError(f"No contract could be found {con}")
return contract
async def get_head_time(
self,
contract: Contract,
) -> datetime:
"""Return the first datetime stamp for ``contract``.
"""
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
useRTH=False,
formatDate=2, # timezone aware UTC datetime
)
async def stream_ticker(
self,
symbol: str,
@ -309,7 +360,13 @@ class Client:
contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
@ -346,9 +403,17 @@ async def _aio_get_client(
"""
# first check cache for existing client
# breakpoint()
try:
yield _client_cache[(host, port)]
except KeyError:
if port:
client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.
@ -359,9 +424,11 @@ async def _aio_get_client(
ib = NonShittyIB()
ports = _try_ports if port is None else [port]
_err = None
for port in ports:
try:
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id)
break
except ConnectionRefusedError as ce:
@ -373,6 +440,7 @@ async def _aio_get_client(
try:
client = Client(ib)
_client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client
except BaseException:
ib.disconnect()
@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None,
**kwargs,
) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client:
async_meth = getattr(client, meth)
@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str,
**kwargs,
) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.
"""
ca = tractor.current_actor()
assert ca.is_infected_aio()
@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {}
@contextmanager
def activate_writer(key: str):
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists:
_local_buffer_writers[key] = True
yield writer_already_exists
async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
async def fill_bars(
first_bars,
shm,
count: int = 21,
) -> None:
"""Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
"""
next_dt = first_bars[0].date
i = 0
while i < count:
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol='.'.join(
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt,
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date
except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error~~
if err.code == 162:
log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS")
await tractor.breakpoint()
# TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api.
# @tractor.msg.pub
@ -575,7 +687,9 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing
with activate_writer(shm_token['shm_name']) as writer_already_exists:
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer
readonly=False,
)
bars = await _trio_run_client_method(
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
)
if bars is None:
log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym)
# write historical data to buffer
shm.push(bars)
shm.push(bars_array)
shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, bars, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
@ -656,6 +781,7 @@ async def stream_quotes(
# real-time stream
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price
o = last
shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = (
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),

View File

@ -57,13 +57,15 @@ _ohlc_dtype = [
('close', float),
('volume', float),
('count', int),
('vwap', float),
('bar_wap', float),
]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True
class Client:
@ -341,7 +343,7 @@ async def stream_quotes(
while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
'wss://ws.kraken.com/',
) as ws:
# XXX: setup subs
@ -433,7 +435,7 @@ async def stream_quotes(
'high',
'low',
'close',
'vwap',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,

View File

@ -42,7 +42,7 @@ from ._sharedmem import (
ShmArray,
get_shm_token,
)
from ._source import base_ohlc_dtype
from ._source import base_iohlc_dtype
from ._buffer import (
increment_ohlc_buffer,
subscribe_ohlc_for_increment
@ -139,6 +139,7 @@ class Feed:
name: str
stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray
# ticks: ShmArray
_broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -188,7 +189,7 @@ async def open_feed(
key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype),
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=True,

View File

@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
# append new entry to buffer thus "incrementing" the bar
array = shm.array
last = array[-1:].copy()
(index, t, close) = last[0][['index', 'time', 'close']]
last = array[-1:][shm._write_fields].copy()
# (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum
last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close)
['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer
shm.push(last)
# broadcast the buffer index step
yield {'index': shm._i.value}
yield {'index': shm._last.value}
def subscribe_ohlc_for_increment(

View File

@ -33,6 +33,6 @@ def iterticks(
ticks = quote.get('ticks', ())
if ticks:
for tick in ticks:
print(f"{quote['symbol']}: {tick}")
# print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types:
yield tick

View File

@ -17,11 +17,10 @@
"""
NumPy compatible shared memory buffers for real-time FSP.
"""
from typing import List
from dataclasses import dataclass, asdict
from sys import byteorder
from typing import Tuple, Optional
from multiprocessing import shared_memory
from typing import List, Tuple, Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink
@ -29,7 +28,7 @@ import tractor
import numpy as np
from ..log import get_logger
from ._source import base_ohlc_dtype
from ._source import base_ohlc_dtype, base_iohlc_dtype
log = get_logger(__name__)
@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd
class SharedInt:
"""Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
"""
def __init__(
self,
token: str,
create: bool = False,
shm: SharedMemory,
) -> None:
# create a single entry array for storing an index counter
self._shm = shared_memory.SharedMemory(
name=token,
create=create,
size=4, # std int
)
self._shm = shm
@property
def value(self) -> int:
@ -79,7 +76,7 @@ class SharedInt:
self._shm.buf[:] = value.to_bytes(4, byteorder)
def destroy(self) -> None:
if shared_memory._USE_POSIX:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
@ -91,7 +88,8 @@ class _Token:
which can be used to key a system wide post shm entry.
"""
shm_name: str # this servers as a "key" value
shm_counter_name: str
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: List[Tuple[str]]
def __post_init__(self):
@ -130,27 +128,47 @@ def _make_token(
"""Create a serializable token that can be used
to access a shared array.
"""
dtype = base_ohlc_dtype if dtype is None else dtype
dtype = base_iohlc_dtype if dtype is None else dtype
return _Token(
key,
key + "_counter",
key + "_first",
key + "_last",
np.dtype(dtype).descr
)
class ShmArray:
"""A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
"""
def __init__(
self,
shmarr: np.ndarray,
counter: SharedInt,
shm: shared_memory.SharedMemory,
readonly: bool = True,
first: SharedInt,
last: SharedInt,
shm: SharedMemory,
# readonly: bool = True,
) -> None:
self._array = shmarr
self._i = counter
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr)
self._shm = shm
self._readonly = readonly
# pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api?
@ -158,24 +176,25 @@ class ShmArray:
def _token(self) -> _Token:
return _Token(
self._shm.name,
self._i._shm.name,
self._first._shm.name,
self._last._shm.name,
self._array.dtype.descr,
)
@property
def token(self) -> dict:
"""Shared memory token that can be serialized
and used by another process to attach to this array.
"""Shared memory token that can be serialized and used by
another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
return self._i.value % self._len
return self._last.value % self._len
@property
def array(self) -> np.ndarray:
return self._array[:self._i.value]
return self._array[self._first.value:self._last.value]
def last(
self,
@ -186,38 +205,90 @@ class ShmArray:
def push(
self,
data: np.ndarray,
prepend: bool = False,
) -> int:
"""Ring buffer like "push" to append data
into the buffer and return updated index.
into the buffer and return updated "last" index.
"""
length = len(data)
# TODO: use .index for actual ring logic?
index = self._i.value
if prepend:
index = self._first.value - length
else:
index = self._last.value
end = index + length
self._array[index:end] = data[:]
self._i.value = end
fields = self._write_fields
try:
self._array[fields][index:end] = data[fields][:]
if prepend:
self._first.value = index
else:
self._last.value = end
return end
except ValueError as err:
# shoudl raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None:
self._i._shm.close()
self._first._shm.close()
self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
if shared_memory._USE_POSIX:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
self._i.destroy()
self._first.destroy()
self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 12)
_default_size = 2 * _secs_in_day
def open_shm_array(
key: Optional[str] = None,
# approx number of 5s bars in a "day" x2
size: int = int(2*60*60*10/5),
size: int = _default_size,
dtype: Optional[np.dtype] = None,
readonly: bool = False,
) -> ShmArray:
@ -229,7 +300,9 @@ def open_shm_array(
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
shm = shared_memory.SharedMemory(
a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
@ -243,17 +316,30 @@ def open_shm_array(
dtype=dtype
)
counter = SharedInt(
token=token.shm_counter_name,
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
counter.value = 0
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
last.value = first.value = int(_secs_in_day)
shmarr = ShmArray(
array,
counter,
first,
last,
shm,
readonly=readonly,
)
assert shmarr._token == token
@ -261,26 +347,31 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
actor = tractor.current_actor()
actor._lifetime_stack.callback(shmarr.close)
actor._lifetime_stack.callback(shmarr.destroy)
tractor._actor._lifetime_stack.callback(shmarr.close)
tractor._actor._lifetime_stack.callback(shmarr.destroy)
return shmarr
def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]],
size: int = int(60*60*10/5),
size: int = _default_size,
readonly: bool = True,
) -> ShmArray:
"""Load and attach to an existing shared memory array previously
"""Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
"""
token = _Token.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert _known_tokens[key] == token, "WTF"
shm = shared_memory.SharedMemory(name=key)
# attach to array buffer and view as per dtype
shm = SharedMemory(name=key)
shmarr = np.ndarray(
(size,),
dtype=token.dtype_descr,
@ -288,15 +379,29 @@ def attach_shm_array(
)
shmarr.setflags(write=int(not readonly))
counter = SharedInt(token=token.shm_counter_name)
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read
counter.value
first.value
sha = ShmArray(
shmarr,
counter,
first,
last,
shm,
readonly=readonly,
)
# read test
sha.array
@ -308,8 +413,8 @@ def attach_shm_array(
_known_tokens[key] = token
# "close" attached shm on process teardown
actor = tractor.current_actor()
actor._lifetime_stack.callback(sha.close)
tractor._actor._lifetime_stack.callback(sha.close)
return sha
@ -318,21 +423,20 @@ def maybe_open_shm_array(
dtype: Optional[np.dtype] = None,
**kwargs,
) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block by a
"key" determined by the users overall "system"
"""Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registryt
(presumes you don't have the block's explicit token).
This function is meant to solve the problem of
discovering whether a shared array token has been
allocated or discovered by the actor running in
**this** process. Systems where multiple actors
may seek to access a common block can use this
function to attempt to acquire a token as discovered
by the actors who have previously stored a
"key" -> ``_Token`` map in an actor local variable.
This function is meant to solve the problem of discovering whether
a shared array token has been allocated or discovered by the actor
running in **this** process. Systems where multiple actors may seek
to access a common block can use this function to attempt to acquire
a token as discovered by the actors who have previously stored
a "key" -> ``_Token`` map in an actor local (aka python global)
variable.
If you know the explicit ``_Token`` for your memory
instead use ``attach_shm_array``.
If you know the explicit ``_Token`` for your memory segment instead
use ``attach_shm_array``.
"""
try:
# see if we already know this key

View File

@ -15,27 +15,36 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Numpy data source machinery.
numpy data source coversion helpers.
"""
import decimal
from dataclasses import dataclass
import numpy as np
import pandas as pd
# from numba import from_dtype
# our minimum structured array layout for ohlc data
base_ohlc_dtype = np.dtype(
[
('index', int),
ohlc_fields = [
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
]
)
('bar_wap', float),
]
ohlc_with_index = ohlc_fields.copy()
ohlc_with_index.insert(0, ('index', int))
# our minimum structured array layout for ohlc data
base_iohlc_dtype = np.dtype(ohlc_with_index)
base_ohlc_dtype = np.dtype(ohlc_fields)
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values
tf_in_1m = {
@ -110,18 +119,27 @@ def from_df(
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
if name not in base_ohlc_dtype.names[1:]:
# if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
array = df.to_records()
array = df.to_records(index=False)
_nan_to_closest_num(array)
return array

View File

@ -20,18 +20,24 @@ Financial signal processing for the peeps.
from typing import AsyncIterator, Callable, Tuple
import trio
from trio_typing import TaskStatus
import tractor
import numpy as np
from ..log import get_logger
from .. import data
from ._momo import _rsi
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array, Feed
log = get_logger(__name__)
_fsps = {'rsi': _rsi}
_fsps = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
}
async def latency(
@ -70,7 +76,7 @@ async def increment_signals(
# write new slot to the buffer
dst_shm.push(last)
len(dst_shm.array)
@tractor.stream
@ -95,9 +101,19 @@ async def cascade(
async with data.open_feed(brokername, [symbol]) as feed:
assert src.token == feed.shm.token
async def fsp_compute(
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream):
async def filter_by_sym(
sym: str,
stream,
):
# task cancellation won't kill the channel
with stream.shield_channel():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
@ -112,14 +128,15 @@ async def cascade(
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
#
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for derivatives.
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input
# and get historical output
@ -133,28 +150,58 @@ async def cascade(
)
history[fsp_func_name] = history_output
# TODO: talk to ``pyqtgraph`` core about proper way to solve this:
# XXX: hack to get curves aligned with bars graphics: prepend
# a copy of the first datum..
# dst.push(history[:1])
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFFZZZ {diff}")
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
yield index
await ctx.send_yield(index)
async with trio.open_nursery() as n:
n.start_soon(increment_signals, feed, dst)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
async with trio.open_nursery() as n:
cs = await n.start(fsp_compute)
# Increment the underlying shared memory buffer on every "increment"
# msg received from the underlying data feed.
async for msg in await feed.index_stream():
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
array = dst.array
last = array[-1:].copy()
# write new slot to the buffer
dst.push(last)
last_len = new_len

View File

@ -151,8 +151,8 @@ def wma(
return np.convolve(signal, weights, 'valid')
# @piker.fsp(
# aggregates=[60, 60*5, 60*60, '4H', '1D'],
# @piker.fsp.signal(
# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa

View File

@ -0,0 +1,93 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import AsyncIterator, Optional
import numpy as np
from ..data._normalize import iterticks
def wap(
signal: np.ndarray,
weights: np.ndarray,
) -> np.ndarray:
"""Weighted average price from signal and weights.
"""
cum_weights = np.cumsum(weights)
cum_weighted_input = np.cumsum(signal * weights)
# cum_weighted_input / cum_weights
# but, avoid divide by zero errors
avg = np.divide(
cum_weighted_input,
cum_weights,
where=cum_weights != 0
)
return (
avg,
cum_weighted_input,
cum_weights,
)
async def _tina_vwap(
source, #: AsyncStream[np.ndarray],
ohlcv: np.ndarray, # price time-frame "aware"
anchors: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming volume weighted moving average.
Calling this "tina" for now since we're using HLC3 instead of tick.
"""
if anchors is None:
# TODO:
# anchor to session start of data if possible
pass
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
h_vwap, cum_wp, cum_v = wap(chl3, v)
# deliver historical output as "first yield"
yield h_vwap
w_tot = cum_wp[-1]
v_tot = cum_v[-1]
# vwap_tot = h_vwap[-1]
async for quote in source:
for tick in iterticks(quote, types=['trade']):
# c, h, l, v = ohlcv.array[-1][
# ['closes', 'high', 'low', 'volume']
# ]
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
v_tot += size
w_tot += price * size
# yield ((((o + h + l) / 3) * v) weights_tot) / v_tot
yield w_tot / v_tot

View File

@ -38,7 +38,7 @@ class Axis(pg.AxisItem):
def __init__(
self,
linked_charts,
typical_max_str: str = '100 000.00',
typical_max_str: str = '100 000.000',
min_tick: int = 2,
**kwargs
) -> None:
@ -51,6 +51,8 @@ class Axis(pg.AxisItem):
self.setStyle(**{
'textFillLimits': [(0, 0.666)],
'tickFont': _font.font,
# offset of text *away from* axis line in px
'tickTextOffset': 2,
})
self.setTickFont(_font.font)
@ -88,8 +90,7 @@ class PriceAxis(Axis):
# print(f'digits: {digits}')
return [
('{value:,.{digits}f}')
.format(
('{value:,.{digits}f}').format(
digits=digits,
value=v,
).replace(',', ' ') for v in vals
@ -104,23 +105,36 @@ class DynamicDateAxis(Axis):
60: '%H:%M',
30: '%H:%M:%S',
5: '%H:%M:%S',
1: '%H:%M:%S',
}
def resize(self) -> None:
self.setHeight(self.typical_br.height() + 3)
self.setHeight(self.typical_br.height() + 1)
def _indexes_to_timestrs(
self,
indexes: List[int],
) -> List[str]:
bars = self.linked_charts.chart._array
# try:
chart = self.linked_charts.chart
bars = chart._ohlc
shm = self.linked_charts.chart._shm
first = shm._first.value
bars_len = len(bars)
times = bars['time']
epochs = times[list(
map(int, filter(lambda i: i < bars_len, indexes))
map(
int,
filter(
lambda i: i > 0 and i < bars_len,
(i-first for i in indexes)
)
)
)]
# TODO: **don't** have this hard coded shift to EST
dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour()
@ -228,6 +242,7 @@ class AxisLabel(pg.GraphicsObject):
class XAxisLabel(AxisLabel):
_w_margin = 4
text_flags = (
QtCore.Qt.TextDontClip
@ -255,18 +270,17 @@ class XAxisLabel(AxisLabel):
w = self.boundingRect().width()
self.setPos(QPointF(
abs_pos.x() - w / 2 - offset,
0,
1,
))
self.update()
class YAxisLabel(AxisLabel):
_h_margin = 3
# _w_margin = 1
_h_margin = 2
text_flags = (
# QtCore.Qt.AlignLeft
QtCore.Qt.AlignHCenter
QtCore.Qt.AlignLeft
# QtCore.Qt.AlignHCenter
| QtCore.Qt.AlignVCenter
| QtCore.Qt.TextDontClip
)
@ -283,13 +297,13 @@ class YAxisLabel(AxisLabel):
) -> None:
# this is read inside ``.paint()``
self.label_str = '{value:,.{digits}f}'.format(
self.label_str = ' {value:,.{digits}f}'.format(
digits=self.digits, value=value).replace(',', ' ')
br = self.boundingRect()
h = br.height()
self.setPos(QPointF(
0,
1,
abs_pos.y() - h / 2 - offset
))
self.update()

View File

@ -17,7 +17,7 @@
"""
High level Qt chart widgets.
"""
from typing import Tuple, Dict, Any, Optional
from typing import Tuple, Dict, Any, Optional, Callable
from functools import partial
from PyQt5 import QtCore, QtGui
@ -105,6 +105,7 @@ class ChartSpace(QtGui.QWidget):
self.tf_layout.setContentsMargins(0, 12, 0, 0)
time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
btn_prefix = 'TF'
for tf in time_frames:
btn_name = ''.join([btn_prefix, tf])
btn = QtGui.QPushButton(tf)
@ -112,6 +113,7 @@ class ChartSpace(QtGui.QWidget):
btn.setEnabled(False)
setattr(self, btn_name, btn)
self.tf_layout.addWidget(btn)
self.toolbar_layout.addLayout(self.tf_layout)
# XXX: strat loader/saver that we don't need yet.
@ -123,8 +125,11 @@ class ChartSpace(QtGui.QWidget):
self,
symbol: str,
data: np.ndarray,
ohlc: bool = True,
) -> None:
"""Load a new contract into the charting app.
Expects a ``numpy`` structured array containing all the ohlcv fields.
"""
# XXX: let's see if this causes mem problems
self.window.setWindowTitle(f'piker chart {symbol}')
@ -147,7 +152,8 @@ class ChartSpace(QtGui.QWidget):
if not self.v_layout.isEmpty():
self.v_layout.removeWidget(linkedcharts)
main_chart = linkedcharts.plot_main(s, data)
main_chart = linkedcharts.plot_ohlc_main(s, data)
self.v_layout.addWidget(linkedcharts)
return linkedcharts, main_chart
@ -175,7 +181,6 @@ class LinkedSplitCharts(QtGui.QWidget):
def __init__(self):
super().__init__()
self.signals_visible: bool = False
self._array: np.ndarray = None # main data source
self._ch: CrossHair = None # crosshair graphics
self.chart: ChartPlotWidget = None # main (ohlc) chart
self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {}
@ -184,11 +189,6 @@ class LinkedSplitCharts(QtGui.QWidget):
orientation='bottom',
linked_charts=self
)
self.xaxis_ind = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
# if _xaxis_at == 'bottom':
# self.xaxis.setStyle(showValues=False)
# self.xaxis.hide()
@ -216,20 +216,18 @@ class LinkedSplitCharts(QtGui.QWidget):
sizes.extend([min_h_ind] * len(self.subplots))
self.splitter.setSizes(sizes) # , int(self.height()*0.2)
def plot_main(
def plot_ohlc_main(
self,
symbol: Symbol,
array: np.ndarray,
ohlc: bool = True,
style: str = 'bar',
) -> 'ChartPlotWidget':
"""Start up and show main (price) chart and all linked subcharts.
The data input struct array must include OHLC fields.
"""
self.digits = symbol.digits()
# TODO: this should eventually be a view onto shared mem or some
# higher level type / API
self._array = array
# add crosshairs
self._ch = CrossHair(
linkedsplitcharts=self,
@ -239,11 +237,13 @@ class LinkedSplitCharts(QtGui.QWidget):
name=symbol.key,
array=array,
xaxis=self.xaxis,
ohlc=True,
style=style,
_is_main=True,
)
# add crosshair graphic
self.chart.addItem(self._ch)
# axis placement
if _xaxis_at == 'bottom':
self.chart.hideAxis('bottom')
@ -257,7 +257,7 @@ class LinkedSplitCharts(QtGui.QWidget):
name: str,
array: np.ndarray,
xaxis: DynamicDateAxis = None,
ohlc: bool = False,
style: str = 'line',
_is_main: bool = False,
**cpw_kwargs,
) -> 'ChartPlotWidget':
@ -267,15 +267,25 @@ class LinkedSplitCharts(QtGui.QWidget):
"""
if self.chart is None and not _is_main:
raise RuntimeError(
"A main plot must be created first with `.plot_main()`")
"A main plot must be created first with `.plot_ohlc_main()`")
# source of our custom interactions
cv = ChartView()
cv.linked_charts = self
# use "indicator axis" by default
xaxis = self.xaxis_ind if xaxis is None else xaxis
if xaxis is None:
xaxis = DynamicDateAxis(
orientation='bottom',
linked_charts=self
)
cpw = ChartPlotWidget(
# this name will be used to register the primary
# graphics curve managed by the subchart
name=name,
array=array,
parent=self.splitter,
axisItems={
@ -286,9 +296,12 @@ class LinkedSplitCharts(QtGui.QWidget):
cursor=self._ch,
**cpw_kwargs,
)
# this name will be used to register the primary
# graphics curve managed by the subchart
cpw.name = name
# give viewbox a reference to primary chart
# allowing for kb controls and interactions
# (see our custom view in `._interactions.py`)
cv.chart = cpw
cpw.plotItem.vb.linked_charts = self
cpw.setFrameStyle(QtGui.QFrame.StyledPanel) # | QtGui.QFrame.Plain)
cpw.hideButtons()
@ -302,11 +315,15 @@ class LinkedSplitCharts(QtGui.QWidget):
self._ch.add_plot(cpw)
# draw curve graphics
if ohlc:
if style == 'bar':
cpw.draw_ohlc(name, array)
else:
elif style == 'line':
cpw.draw_curve(name, array)
else:
raise ValueError(f"Chart style {style} is currently unsupported")
if not _is_main:
# track by name
self.subplots[name] = cpw
@ -316,6 +333,8 @@ class LinkedSplitCharts(QtGui.QWidget):
# XXX: we need this right?
# self.splitter.addWidget(cpw)
else:
assert style == 'bar', 'main chart must be OHLC'
return cpw
@ -341,6 +360,7 @@ class ChartPlotWidget(pg.PlotWidget):
def __init__(
self,
# the data view we generate graphics from
name: str,
array: np.ndarray,
static_yrange: Optional[Tuple[float, float]] = None,
cursor: Optional[CrossHair] = None,
@ -353,16 +373,26 @@ class ChartPlotWidget(pg.PlotWidget):
# parent=None,
# plotItem=None,
# antialias=True,
useOpenGL=True,
**kwargs
)
self.name = name
# self.setViewportMargins(0, 0, 0, 0)
self._array = array # readonly view of data
self._ohlc = array # readonly view of ohlc data
self.default_view()
self._arrays = {} # readonly view of overlays
self._graphics = {} # registry of underlying graphics
self._overlays = {} # registry of overlay curves
self._overlays = set() # registry of overlay curve names
self._labels = {} # registry of underlying graphics
self._ysticks = {} # registry of underlying graphics
self._vb = self.plotItem.vb
self._static_yrange = static_yrange # for "known y-range style"
self._view_mode: str = 'follow'
self._cursor = cursor # placehold for mouse
@ -373,6 +403,7 @@ class ChartPlotWidget(pg.PlotWidget):
# show background grid
self.showGrid(x=True, y=True, alpha=0.5)
# TODO: stick in config
# use cross-hair for cursor?
# self.setCursor(QtCore.Qt.CrossCursor)
@ -387,14 +418,25 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.sigResized.connect(self._set_yrange)
def last_bar_in_view(self) -> bool:
self._array[-1]['index']
def update_contents_labels(self, index: int) -> None:
if index >= 0 and index < len(self._array):
array = self._array
self._ohlc[-1]['index']
def update_contents_labels(
self,
index: int,
# array_name: str,
) -> None:
if index >= 0 and index < self._ohlc[-1]['index']:
for name, (label, update) in self._labels.items():
if name is self.name:
array = self._ohlc
else:
array = self._arrays[name]
try:
update(index, array)
except IndexError:
log.exception(f"Failed to update label: {name}")
def _set_xlimits(
self,
@ -418,8 +460,11 @@ class ChartPlotWidget(pg.PlotWidget):
"""Return a range tuple for the bars present in view.
"""
l, r = self.view_range()
lbar = max(l, 0)
rbar = min(r, len(self._array))
a = self._ohlc
lbar = max(l, a[0]['index'])
rbar = min(r, a[-1]['index'])
# lbar = max(l, 0)
# rbar = min(r, len(self._ohlc))
return l, lbar, rbar, r
def default_view(
@ -429,7 +474,8 @@ class ChartPlotWidget(pg.PlotWidget):
"""Set the view box to the "default" startup view of the scene.
"""
xlast = self._array[index]['index']
xlast = self._ohlc[index]['index']
print(xlast)
begin = xlast - _bars_to_left_in_follow_mode
end = xlast + _bars_from_right_in_follow_mode
@ -450,7 +496,7 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.setXRange(
min=l + 1,
max=r + 1,
# holy shit, wtf dude... why tf would this not be 0 by
# TODO: holy shit, wtf dude... why tf would this not be 0 by
# default... speechless.
padding=0,
)
@ -465,6 +511,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Draw OHLC datums to chart.
"""
graphics = style(self.plotItem)
# adds all bar/candle graphics objects for each data point in
# the np array buffer to be drawn on next render cycle
self.addItem(graphics)
@ -474,9 +521,11 @@ class ChartPlotWidget(pg.PlotWidget):
self._graphics[name] = graphics
label = ContentsLabel(chart=self, anchor_at=('top', 'left'))
self._labels[name] = (label, partial(label.update_from_ohlc, name))
label.show()
self.add_contents_label(
name,
anchor_at=('top', 'left'),
update_func=ContentsLabel.update_from_ohlc,
)
self.update_contents_labels(len(data) - 1)
self._add_sticky(name)
@ -488,41 +537,52 @@ class ChartPlotWidget(pg.PlotWidget):
name: str,
data: np.ndarray,
overlay: bool = False,
color: str = 'default_light',
add_label: bool = True,
**pdi_kwargs,
) -> pg.PlotDataItem:
# draw the indicator as a plain curve
"""Draw a "curve" (line plot graphics) for the provided data in
the input array ``data``.
"""
_pdi_defaults = {
'pen': pg.mkPen(hcolor('default_light')),
'pen': pg.mkPen(hcolor(color)),
}
pdi_kwargs.update(_pdi_defaults)
curve = pg.PlotDataItem(
data[name],
y=data[name],
x=data['index'],
# antialias=True,
name=name,
# TODO: see how this handles with custom ohlcv bars graphics
clipToView=True,
# and/or if we can implement something similar for OHLC graphics
# clipToView=True,
autoDownsample=True,
downsampleMethod='subsample',
**pdi_kwargs,
)
self.addItem(curve)
# register overlay curve with name
# register curve graphics and backing array for name
self._graphics[name] = curve
self._arrays[name] = data
if overlay:
anchor_at = ('bottom', 'right')
self._overlays[name] = curve
anchor_at = ('bottom', 'left')
self._overlays.add(name)
else:
anchor_at = ('top', 'right')
anchor_at = ('top', 'left')
# TODO: something instead of stickies for overlays
# (we need something that avoids clutter on x-axis).
self._add_sticky(name, bg_color='default_light')
label = ContentsLabel(chart=self, anchor_at=anchor_at)
self._labels[name] = (label, partial(label.update_from_value, name))
label.show()
if add_label:
self.add_contents_label(name, anchor_at=anchor_at)
self.update_contents_labels(len(data) - 1)
if self._cursor:
@ -530,6 +590,23 @@ class ChartPlotWidget(pg.PlotWidget):
return curve
def add_contents_label(
self,
name: str,
anchor_at: Tuple[str, str] = ('top', 'left'),
update_func: Callable = ContentsLabel.update_from_value,
) -> ContentsLabel:
label = ContentsLabel(chart=self, anchor_at=anchor_at)
self._labels[name] = (
# calls class method on instance
label,
partial(update_func, label, name)
)
label.show()
return label
def _add_sticky(
self,
name: str,
@ -556,9 +633,7 @@ class ChartPlotWidget(pg.PlotWidget):
"""Update the named internal graphics from ``array``.
"""
if name not in self._overlays:
self._array = array
self._ohlc = array
graphics = self._graphics[name]
graphics.update_from_array(array, **kwargs)
return graphics
@ -573,12 +648,18 @@ class ChartPlotWidget(pg.PlotWidget):
"""
if name not in self._overlays:
self._array = array
self._ohlc = array
else:
self._arrays[name] = array
curve = self._graphics[name]
# TODO: we should instead implement a diff based
# "only update with new items" on the pg.PlotDataItem
curve.setData(array[name], **kwargs)
# "only update with new items" on the pg.PlotCurveItem
# one place to dig around this might be the `QBackingStore`
# https://doc.qt.io/qt-5/qbackingstore.html
curve.setData(y=array[name], x=array['index'], **kwargs)
return curve
def _set_yrange(
@ -612,8 +693,9 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: logic to check if end of bars in view
extra = view_len - _min_points_to_show
begin = 0 - extra
end = len(self._array) - 1 + extra
begin = self._ohlc[0]['index'] - extra
# end = len(self._ohlc) - 1 + extra
end = self._ohlc[-1]['index'] - 1 + extra
# XXX: test code for only rendering lines for the bars in view.
# This turns out to be very very poor perf when scaling out to
@ -629,10 +711,15 @@ class ChartPlotWidget(pg.PlotWidget):
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
self._set_xlimits(begin, end)
# self._set_xlimits(begin, end)
# TODO: this should be some kind of numpy view api
bars = self._array[lbar:rbar]
# bars = self._ohlc[lbar:rbar]
a = self._ohlc
ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
@ -644,39 +731,42 @@ class ChartPlotWidget(pg.PlotWidget):
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
except (IndexError, ValueError):
# must be non-ohlc array?
# likely non-ohlc array?
bars = bars[self.name]
ylow = np.nanmin(bars)
yhigh = np.nanmax(bars)
# view margins: stay within a % of the "true range"
diff = yhigh - ylow
ylow = ylow - (diff * 0.04)
# yhigh = yhigh + (diff * 0.01)
yhigh = yhigh + (diff * 0.04)
# compute contents label "height" in view terms
# to avoid having data "contents" overlap with them
if self._labels:
label = self._labels[self.name][0]
# # compute contents label "height" in view terms
# # to avoid having data "contents" overlap with them
# if self._labels:
# label = self._labels[self.name][0]
rect = label.itemRect()
tl, br = rect.topLeft(), rect.bottomRight()
vb = self.plotItem.vb
# rect = label.itemRect()
# tl, br = rect.topLeft(), rect.bottomRight()
# vb = self.plotItem.vb
try:
# on startup labels might not yet be rendered
top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
# try:
# # on startup labels might not yet be rendered
# top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y())
# XXX: magic hack, how do we compute exactly?
label_h = (top - bottom) * 0.42
# # XXX: magic hack, how do we compute exactly?
# label_h = (top - bottom) * 0.42
except np.linalg.LinAlgError:
label_h = 0
else:
label_h = 0
# except np.linalg.LinAlgError:
# label_h = 0
# else:
# label_h = 0
# print(f'label height {self.name}: {label_h}')
# # print(f'label height {self.name}: {label_h}')
if label_h > yhigh - ylow:
# if label_h > yhigh - ylow:
# label_h = 0
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
label_h = 0
self.setLimits(
@ -715,13 +805,6 @@ async def _async_main(
# chart_app.init_search()
# from ._exec import get_screen
# screen = get_screen(chart_app.geometry().bottomRight())
# XXX: bug zone if you try to ctl-c after this we get hangs again?
# wtf...
# await tractor.breakpoint()
# historical data fetch
brokermod = brokers.get_brokermod(brokername)
@ -735,46 +818,62 @@ async def _async_main(
bars = ohlcv.array
# load in symbol's ohlc data
# await tractor.breakpoint()
linked_charts, chart = chart_app.load_symbol(sym, bars)
# plot historical vwap if available
vwap_in_history = False
if 'vwap' in bars.dtype.fields:
vwap_in_history = True
wap_in_history = False
if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='vwap',
name='bar_wap',
data=bars,
overlay=True,
add_label=False,
)
chart._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
# eventually we'll support some kind of n-compose syntax
fsp_conf = {
'vwap': {
'overlay': True,
'anchor': 'session',
},
'rsi': {
'period': 14,
'chart_kwargs': {
'static_yrange': (0, 100),
},
},
}
async with trio.open_nursery() as n:
# load initial fsp chain (otherwise known as "indicators")
n.start_soon(
chart_from_fsp,
spawn_fsps,
linked_charts,
'rsi', # eventually will be n-compose syntax
fsp_conf,
sym,
ohlcv,
brokermod,
loglevel,
)
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
# start graphics update loop(s)after receiving first live quote
n.start_soon(
chart_from_quotes,
chart,
feed.stream,
ohlcv,
vwap_in_history,
wap_in_history,
)
# wait for a first quote before we start any update tasks
@ -797,9 +896,10 @@ async def chart_from_quotes(
chart: ChartPlotWidget,
stream,
ohlcv: np.ndarray,
vwap_in_history: bool = False,
wap_in_history: bool = False,
) -> None:
"""The 'main' (price) chart real-time update loop.
"""
# TODO: bunch of stuff:
# - I'm starting to think all this logic should be
@ -809,24 +909,40 @@ async def chart_from_quotes(
# - update last open price correctly instead
# of copying it from last bar's close
# - 5 sec bar lookback-autocorrection like tws does?
# update last price sticky
last_price_sticky = chart._ysticks[chart.name]
last_price_sticky.update_from_data(
*ohlcv.array[-1][['index', 'close']]
)
def maxmin():
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
array = chart._array
array = chart._ohlc
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar:rbar]
in_view = array[lbar - ifirst:rbar - ifirst]
assert in_view.size
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
return last_bars_range, mx, mn
last_bars_range, last_mx, last_mn = maxmin()
chart.default_view()
last_bars_range, last_mx, last_mn = maxmin()
last, volume = ohlcv.array[-1][['close', 'volume']]
l1 = L1Labels(
@ -836,9 +952,16 @@ async def chart_from_quotes(
size_digits=min(float_digits(volume), 3)
)
# TODO:
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# - if trade volume jumps above / below prior L1 price
# levels this might be dark volume we need to
# present differently?
async for quotes in stream:
for sym, quote in quotes.items():
# print(f'CHART: {quote}')
for tick in quote.get('ticks', ()):
@ -847,7 +970,14 @@ async def chart_from_quotes(
price = tick.get('price')
size = tick.get('size')
if ticktype in ('trade', 'utrade'):
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
brange, mx_in_view, mn_in_view = maxmin()
l, lbar, rbar, r = brange
if ticktype in ('trade', 'utrade', 'last'):
array = ohlcv.array
# update price sticky(s)
@ -856,30 +986,16 @@ async def chart_from_quotes(
*last[['index', 'close']]
)
# plot bars
# update price bar
chart.update_ohlc_from_array(
chart.name,
array,
)
if vwap_in_history:
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array('vwap', ohlcv.array)
# TODO:
# - eventually we'll want to update bid/ask labels
# and other data as subscribed by underlying UI
# consumers.
# - in theory we should be able to read buffer data faster
# then msgs arrive.. needs some tinkering and testing
# if trade volume jumps above / below prior L1 price
# levels adjust bid / ask lines to match
# compute max and min trade values to display in view
# TODO: we need a streaming minmax algorithm here, see
# def above.
brange, mx_in_view, mn_in_view = maxmin()
chart.update_curve_from_array('bar_wap', ohlcv.array)
# XXX: prettty sure this is correct?
# if ticktype in ('trade', 'last'):
@ -910,7 +1026,7 @@ async def chart_from_quotes(
l1.bid_label.update_from_data(0, price)
# update min price in view to keep bid on screen
mn_in_view = max(price, mn_in_view)
mn_in_view = min(price, mn_in_view)
if mx_in_view > last_mx or mn_in_view < last_mn:
chart._set_yrange(yrange=(mn_in_view, mx_in_view))
@ -923,9 +1039,10 @@ async def chart_from_quotes(
last_bars_range = brange
async def chart_from_fsp(
linked_charts,
fsp_func_name,
async def spawn_fsps(
linked_charts: LinkedSplitCharts,
# fsp_func_name,
fsps: Dict[str, str],
sym,
src_shm,
brokermod,
@ -934,8 +1051,21 @@ async def chart_from_fsp(
"""Start financial signal processing in subactor.
Pass target entrypoint and historical data.
"""
name = f'fsp.{fsp_func_name}'
# spawns sub-processes which execute cpu bound FSP code
async with tractor.open_nursery() as n:
# spawns local task that consume and chart data streams from
# sub-procs
async with trio.open_nursery() as ln:
# Currently we spawn an actor per fsp chain but
# likely we'll want to pool them eventually to
# scale horizonatlly once cores are used up.
for fsp_func_name, conf in fsps.items():
display_name = f'fsp.{fsp_func_name}'
# TODO: load function here and introspect
# return stream type(s)
@ -943,9 +1073,9 @@ async def chart_from_fsp(
# TODO: should `index` be a required internal field?
fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)])
async with tractor.open_nursery() as n:
key = f'{sym}.' + name
key = f'{sym}.' + display_name
# this is all sync currently
shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
@ -957,19 +1087,30 @@ async def chart_from_fsp(
# now until we figure out how to wrap fsps as "feeds".
assert opened, f"A chart for {key} likely already exists?"
# start fsp sub-actor
conf['shm'] = shm
# spawn closure, can probably define elsewhere
async def spawn_fsp_daemon(
fsp_name: str,
display_name: str,
conf: dict,
):
"""Start an fsp subactor async.
"""
print(f'FSP NAME: {fsp_name}')
portal = await n.run_in_actor(
# name as title of sub-chart
name,
display_name,
# subactor entrypoint
fsp.cascade,
brokername=brokermod.name,
src_shm_token=src_shm.token,
dst_shm_token=shm.token,
dst_shm_token=conf['shm'].token,
symbol=sym,
fsp_func_name=fsp_func_name,
fsp_func_name=fsp_name,
# tractor config
loglevel=loglevel,
@ -981,6 +1122,54 @@ async def chart_from_fsp(
# data-array as first msg
_ = await stream.receive()
conf['stream'] = stream
conf['portal'] = portal
# new local task
ln.start_soon(
spawn_fsp_daemon,
fsp_func_name,
display_name,
conf,
)
# blocks here until all daemons up
# start and block on update loops
async with trio.open_nursery() as ln:
for fsp_func_name, conf in fsps.items():
ln.start_soon(
update_signals,
linked_charts,
fsp_func_name,
conf,
)
async def update_signals(
linked_charts: LinkedSplitCharts,
fsp_func_name: str,
conf: Dict[str, Any],
) -> None:
"""FSP stream chart update loop.
This is called once for each entry in the fsp
config map.
"""
shm = conf['shm']
if conf.get('overlay'):
chart = linked_charts.chart
chart.draw_curve(
name='vwap',
data=shm.array,
overlay=True,
)
last_val_sticky = None
else:
chart = linked_charts.add_plot(
name=fsp_func_name,
array=shm.array,
@ -989,12 +1178,17 @@ async def chart_from_fsp(
ohlc=False,
# settings passed down to ``ChartPlotWidget``
static_yrange=(0, 100),
**conf.get('chart_kwargs', {})
# static_yrange=(0, 100),
)
# display contents labels asap
chart.update_contents_labels(len(shm.array) - 1)
chart.update_contents_labels(
len(shm.array) - 1,
# fsp_func_name
)
# read last value
array = shm.array
value = array[fsp_func_name][-1]
@ -1002,7 +1196,8 @@ async def chart_from_fsp(
last_val_sticky.update_from_data(-1, value)
chart.update_curve_from_array(fsp_func_name, array)
chart.default_view()
chart._shm = shm
# TODO: figure out if we can roll our own `FillToThreshold` to
# get brush filled polygons for OS/OB conditions.
@ -1015,20 +1210,42 @@ async def chart_from_fsp(
# graphics.curve.setFillLevel(50)
# add moveable over-[sold/bought] lines
level_line(chart, 30)
level_line(chart, 70, orient_v='top')
# and labels only for the 70/30 lines
level_line(chart, 20, show_label=False)
level_line(chart, 30, orient_v='top')
level_line(chart, 70, orient_v='bottom')
level_line(chart, 80, orient_v='top', show_label=False)
chart._shm = shm
chart._set_yrange()
stream = conf['stream']
# update chart graphics
async for value in stream:
# p = pg.debug.Profiler(disabled=False, delayed=False)
# TODO: provide a read sync mechanism to avoid this polling.
# the underlying issue is that a backfill and subsequent shm
# array first/last index update could result in an empty array
# read here since the stream is never torn down on the
# re-compute steps.
read_tries = 2
while read_tries > 0:
try:
# read last
array = shm.array
value = array[-1][fsp_func_name]
break
except IndexError:
read_tries -= 1
continue
if last_val_sticky:
last_val_sticky.update_from_data(-1, value)
# update graphics
chart.update_curve_from_array(fsp_func_name, array)
# p('rendered rsi datum')
async def check_for_new_bars(feed, ohlcv, linked_charts):
@ -1079,18 +1296,24 @@ async def check_for_new_bars(feed, ohlcv, linked_charts):
# resize view
# price_chart._set_yrange()
for name, curve in price_chart._overlays.items():
for name in price_chart._overlays:
# TODO: standard api for signal lookups per plot
if name in price_chart._array.dtype.fields:
price_chart.update_curve_from_array(
name,
price_chart._arrays[name]
)
# should have already been incremented above
price_chart.update_curve_from_array(name, price_chart._array)
# # TODO: standard api for signal lookups per plot
# if name in price_chart._ohlc.dtype.fields:
# # should have already been incremented above
# price_chart.update_curve_from_array(name, price_chart._ohlc)
for name, chart in linked_charts.subplots.items():
chart.update_curve_from_array(chart.name, chart._shm.array)
# chart._set_yrange()
# shift the view if in follow mode
price_chart.increment_view()

View File

@ -20,12 +20,15 @@ Trio - Qt integration
Run ``trio`` in guest mode on top of the Qt event loop.
All global Qt runtime settings are mostly defined here.
"""
import os
import signal
from functools import partial
import traceback
from typing import Tuple, Callable, Dict, Any
# Qt specific
import PyQt5 # noqa
import pyqtgraph as pg
from pyqtgraph import QtGui
from PyQt5 import QtCore
from PyQt5.QtCore import (
@ -37,6 +40,12 @@ import tractor
from outcome import Error
# pyqtgraph global config
# might as well enable this for now?
pg.useOpenGL = True
pg.enableExperimental = True
# singleton app per actor
_qt_app: QtGui.QApplication = None
_qt_win: QtGui.QMainWindow = None
@ -67,6 +76,17 @@ class MainWindow(QtGui.QMainWindow):
self.setMinimumSize(*self.size)
self.setWindowTitle(self.title)
def closeEvent(
self,
event: 'QCloseEvent'
) -> None:
"""Cancel the root actor asap.
"""
# raising KBI seems to get intercepted by by Qt so just use the
# system.
os.kill(os.getpid(), signal.SIGINT)
def run_qtractor(
func: Callable,
@ -115,10 +135,14 @@ def run_qtractor(
def done_callback(outcome):
print(f"Outcome: {outcome}")
if isinstance(outcome, Error):
exc = outcome.error
if isinstance(outcome.error, KeyboardInterrupt):
# make it kinda look like ``trio``
print("Terminated!")
else:
traceback.print_exception(type(exc), exc, exc.__traceback__)
app.quit()
@ -154,6 +178,7 @@ def run_qtractor(
main,
run_sync_soon_threadsafe=run_sync_soon_threadsafe,
done_callback=done_callback,
# restrict_keyboard_interrupt_to_checkpoints=True,
)
window.main_widget = main_widget

View File

@ -17,39 +17,44 @@
"""
Chart graphics for displaying a slew of different data types.
"""
# import time
import inspect
from typing import List, Optional, Tuple
import numpy as np
import pyqtgraph as pg
# from numba import jit, float64, optional, int64
from numba import jit, float64, int64 # , optional
# from numba import types as ntypes
from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import QLineF, QPointF
# from .._profile import timeit
from .._profile import timeit
# from ..data._source import numba_ohlc_dtype
from ._style import (
_xaxis_at,
hcolor,
_font,
_down_2_font_inches_we_like,
)
from ._axes import YAxisLabel, XAxisLabel, YSticky
# XXX: these settings seem to result in really decent mouse scroll
# latency (in terms of perceived lag in cross hair) so really be sure
# there's an improvement if you want to change it.
_mouse_rate_limit = 60 # calc current screen refresh rate?
# there's an improvement if you want to change it!
_mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 2e3
_ch_label_opac = 1
# TODO: we need to handle the case where index is outside
# the underlying datums range
class LineDot(pg.CurvePoint):
def __init__(
self,
curve: pg.PlotCurveItem,
index: int,
plot: 'ChartPlotWidget',
pos=None,
size: int = 2, # in pxs
color: str = 'default_light',
@ -61,6 +66,7 @@ class LineDot(pg.CurvePoint):
pos=pos,
rotate=False,
)
self._plot = plot
# TODO: get pen from curve if not defined?
cdefault = hcolor(color)
@ -80,6 +86,31 @@ class LineDot(pg.CurvePoint):
# keep a static size
self.setFlag(self.ItemIgnoresTransformations)
def event(
self,
ev: QtCore.QEvent,
) -> None:
# print((ev, type(ev)))
if not isinstance(ev, QtCore.QDynamicPropertyChangeEvent) or self.curve() is None:
return False
# if ev.propertyName() == 'index':
# print(ev)
# # self.setProperty
(x, y) = self.curve().getData()
index = self.property('index')
# first = self._plot._ohlc[0]['index']
# first = x[0]
# i = index - first
i = index - x[0]
if i > 0 and i < len(y):
newPos = (index, y[i])
QtGui.QGraphicsItem.setPos(self, *newPos)
return True
return False
_corner_anchors = {
'top': 0,
@ -91,8 +122,9 @@ _corner_anchors = {
_corner_margins = {
('top', 'left'): (-4, -5),
('top', 'right'): (4, -5),
('bottom', 'left'): (-4, 5),
('bottom', 'right'): (4, 5),
('bottom', 'left'): (-4, lambda font_size: font_size * 2),
('bottom', 'right'): (4, lambda font_size: font_size * 2),
}
@ -109,7 +141,10 @@ class ContentsLabel(pg.LabelItem):
font_size: Optional[int] = None,
) -> None:
font_size = font_size or _font.font.pixelSize()
super().__init__(justify=justify_text, size=f'{str(font_size)}px')
super().__init__(
justify=justify_text,
size=f'{str(font_size)}px'
)
# anchor to viewbox
self.setParentItem(chart._vb)
@ -120,6 +155,10 @@ class ContentsLabel(pg.LabelItem):
index = (_corner_anchors[h], _corner_anchors[v])
margins = _corner_margins[(v, h)]
ydim = margins[1]
if inspect.isfunction(margins[1]):
margins = margins[0], ydim(font_size)
self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc(
@ -129,15 +168,19 @@ class ContentsLabel(pg.LabelItem):
array: np.ndarray,
) -> None:
# this being "html" is the dumbest shit :eyeroll:
first = array[0]['index']
self.setText(
"<b>i</b>:{index}<br/>"
"<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>"
"<b>C</b>:{}<br/>"
"<b>V</b>:{}".format(
# *self._array[index].item()[2:8],
*array[index].item()[2:8],
"<b>V</b>:{}<br/>"
"<b>wap</b>:{}".format(
*array[index - first][
['open', 'high', 'low', 'close', 'volume', 'bar_wap']
],
name=name,
index=index,
)
@ -149,7 +192,9 @@ class ContentsLabel(pg.LabelItem):
index: int,
array: np.ndarray,
) -> None:
data = array[index][name]
first = array[0]['index']
if index < array[-1]['index'] and index > first:
data = array[index - first][name]
self.setText(f"{name}: {data:.2f}")
@ -246,7 +291,7 @@ class CrossHair(pg.GraphicsObject):
) -> LineDot:
# if this plot contains curves add line dot "cursors" to denote
# the current sample under the mouse
cursor = LineDot(curve, index=len(plot._array))
cursor = LineDot(curve, index=plot._ohlc[-1]['index'], plot=plot)
plot.addItem(cursor)
self.graphics[plot].setdefault('cursors', []).append(cursor)
return cursor
@ -308,8 +353,9 @@ class CrossHair(pg.GraphicsObject):
plot.update_contents_labels(ix)
# update all subscribed curve dots
# first = plot._ohlc[0]['index']
for cursor in opts.get('cursors', ()):
cursor.setIndex(ix)
cursor.setIndex(ix) # - first)
# update the label on the bottom of the crosshair
self.xaxis_label.update_label(
@ -332,48 +378,23 @@ class CrossHair(pg.GraphicsObject):
return self.plots[0].boundingRect()
# @jit(
# # float64[:](
# # float64[:],
# # optional(float64),
# # optional(int16)
# # ),
# nopython=True,
# nogil=True
# )
def _mk_lines_array(data: List, size: int) -> np.ndarray:
"""Create an ndarray to hold lines graphics objects.
def _mk_lines_array(
data: List,
size: int,
elements_step: int = 6,
) -> np.ndarray:
"""Create an ndarray to hold lines graphics info.
"""
return np.zeros_like(
data,
shape=(int(size), 3),
shape=(int(size), elements_step),
dtype=object,
)
# TODO: `numba` this?
# @jit(
# # float64[:](
# # float64[:],
# # optional(float64),
# # optional(int16)
# # ),
# nopython=True,
# nogil=True
# )
def bars_from_ohlc(
data: np.ndarray,
w: float,
start: int = 0,
) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data.
"""
lines = _mk_lines_array(data, data.shape[0])
for i, q in enumerate(data[start:], start=start):
open, high, low, close, index = q[
def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
open, high, low, close, index = row[
['open', 'high', 'low', 'close', 'index']]
# high -> low vertical (body) line
@ -393,35 +414,91 @@ def bars_from_ohlc(
# close line
c = QLineF(index, close, index + w, close)
# indexing here is as per the below comments
lines[i] = (hl, o, c)
return [hl, o, c]
# XXX: in theory we could get a further speedup by using a flat
# array and avoiding the call to `np.ravel()` below?
# lines[3*i:3*i+3] = (hl, o, c)
# XXX: legacy code from candles custom graphics:
# if not _tina_mode:
# else _tina_mode:
# self.lines = lines = np.concatenate(
# [high_to_low, open_sticks, close_sticks])
# use traditional up/down green/red coloring
# long_bars = np.resize(Quotes.close > Quotes.open, len(lines))
# short_bars = np.resize(
# Quotes.close < Quotes.open, len(lines))
@jit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.Tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nopython=True,
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data.
# ups = lines[long_bars]
# downs = lines[short_bars]
"""
size = int(data.shape[0] * 6)
# # draw "up" bars
# p.setPen(self.bull_brush)
# p.drawLines(*ups)
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# # draw "down" bars
# p.setPen(self.bear_brush)
# p.drawLines(*downs)
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
return lines
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (0, 1, 1, 1, 1, 1)
return x, y, c
# @timeit
def gen_qpath(
data,
start, # XXX: do we need this?
w,
) -> QtGui.QPainterPath:
x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
# TODO: numba the internals of this!
return pg.functions.arrayToQPath(x, y, connect=c)
class BarItems(pg.GraphicsObject):
@ -431,11 +508,10 @@ class BarItems(pg.GraphicsObject):
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43
bars_pen = pg.mkPen(hcolor('bracket'))
# XXX: tina mode, see below
# bull_brush = pg.mkPen('#00cc00')
# bear_brush = pg.mkPen('#fa0000')
# XXX: for the mega-lulz increasing width here increases draw latency...
# so probably don't do it until we figure that out.
bars_pen = pg.mkPen(hcolor('bracket'))
def __init__(
self,
@ -443,95 +519,87 @@ class BarItems(pg.GraphicsObject):
plotitem: 'pg.PlotItem', # noqa
) -> None:
super().__init__()
self.last = QtGui.QPicture()
self.history = QtGui.QPicture()
# TODO: implement updateable pixmap solution
self.last_bar = QtGui.QPicture()
self.path = QtGui.QPainterPath()
# self._h_path = QtGui.QGraphicsPathItem(self.path)
self._pi = plotitem
# self._scene = plotitem.vb.scene()
# self.picture = QtGui.QPixmap(1000, 300)
# plotitem.addItem(self.picture)
# self._pmi = None
# self._pmi = self._scene.addPixmap(self.picture)
self._xrange: Tuple[int, int]
self._yrange: Tuple[float, float]
# XXX: not sure this actually needs to be an array other
# then for the old tina mode calcs for up/down bars below?
# lines container
self.lines = _mk_lines_array([], 50e3)
# self.lines = _mk_lines_array([], 50e3, 6)
# TODO: don't render the full backing array each time
# self._path_data = None
self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None
# track the current length of drawable lines within the larger array
self.index: int = 0
self.start_index: int = 0
self.stop_index: int = 0
# @timeit
def draw_from_data(
self,
data: np.ndarray,
start: int = 0,
):
) -> QtGui.QPainterPath:
"""Draw OHLC datum graphics from a ``np.ndarray``.
This routine is usually only called to draw the initial history.
"""
lines = bars_from_ohlc(data, self.w, start=start)
self.path = gen_qpath(data, start, self.w)
# save graphics for later reference and keep track
# of current internal "last index"
index = len(lines)
self.lines[:index] = lines
self.index = index
# self.start_index = len(data)
index = data['index']
self._xrange = (index[0], index[-1])
self._yrange = (
np.nanmax(data['high']),
np.nanmin(data['low']),
)
# up to last to avoid double draw of last bar
self.draw_lines(just_history=True, iend=self.index - 1)
self.draw_lines(iend=self.index)
self._last_bar_lines = lines_from_ohlc(data[-1], self.w)
# @timeit
def draw_lines(
self,
istart=0,
iend=None,
just_history=False,
# TODO: could get even fancier and only update the single close line?
lines=None,
) -> None:
"""Draw the current line set using the painter.
"""
if just_history:
# draw bars for the "history" picture
iend = iend or self.index - 1
pic = self.history
else:
# draw the last bar
istart = self.index - 1
iend = iend or self.index
pic = self.last
# create pics
# self.draw_history()
self.draw_last_bar()
# use 2d array of lines objects, see conlusion on speed:
# https://stackoverflow.com/a/60089929
flat = np.ravel(self.lines[istart:iend])
# TODO: do this with numba for speed gain:
# https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach
to_draw = flat[np.where(flat != None)] # noqa
# pre-computing a QPicture object allows paint() to run much
# more quickly, rather than re-drawing the shapes every time.
p = QtGui.QPainter(pic)
p.setPen(self.bars_pen)
# TODO: is there any way to not have to pass all the lines every
# iteration? It seems they won't draw unless it's done this way..
p.drawLines(*to_draw)
p.end()
# XXX: if we ever try using `QPixmap` again...
# if self._pmi is None:
# self._pmi = self.scene().addPixmap(self.picture)
# else:
# self._pmi.setPixmap(self.picture)
# trigger re-render
# trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update()
return self.path
# def update_ranges(
# self,
# xmn: int,
# xmx: int,
# ymn: float,
# ymx: float,
# ) -> None:
# ...
def draw_last_bar(self) -> None:
"""Currently this draws lines to a cached ``QPicture`` which
is supposed to speed things up on ``.paint()`` calls (which
is a call to ``QPainter.drawPicture()`` but I'm not so sure.
"""
p = QtGui.QPainter(self.last_bar)
p.setPen(self.bars_pen)
p.drawLines(*tuple(filter(bool, self._last_bar_lines)))
p.end()
# @timeit
def update_from_array(
self,
array: np.ndarray,
@ -545,32 +613,65 @@ class BarItems(pg.GraphicsObject):
graphics object, and then update/rerender, but here we're
assuming the prior graphics havent changed (OHLC history rarely
does) so this "should" be simpler and faster.
This routine should be made (transitively) as fast as possible.
"""
index = self.index
length = len(array)
extra = length - index
# index = self.start_index
istart, istop = self._xrange
# start_bar_to_update = index - 100
index = array['index']
first_index, last_index = index[0], index[-1]
# length = len(array)
prepend_length = istart - first_index
append_length = last_index - istop
# TODO: allow mapping only a range of lines thus
# only drawing as many bars as exactly specified.
if prepend_length:
# new history was added and we need to render a new path
new_bars = array[:prepend_length]
prepend_path = gen_qpath(new_bars, 0, self.w)
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from
# array[prepend_length + 1] ???
# update path
old_path = self.path
self.path = prepend_path
self.path.addPath(old_path)
if append_length:
# generate new lines objects for updatable "current bar"
self._last_bar_lines = lines_from_ohlc(array[-1], self.w)
self.draw_last_bar()
if extra > 0:
# generate new graphics to match provided array
new = array[index:index + extra]
lines = bars_from_ohlc(new, self.w)
bars_added = len(lines)
self.lines[index:index + bars_added] = lines
self.index += bars_added
# path appending logic:
# we need to get the previous "current bar(s)" for the time step
# and convert it to a sub-path to append to the historical set
# new_bars = array[istop - 1:istop + append_length - 1]
new_bars = array[-append_length - 1:-1]
append_path = gen_qpath(new_bars, 0, self.w)
self.path.moveTo(float(istop - self.w), float(new_bars[0]['open']))
self.path.addPath(append_path)
self._xrange = first_index, last_index
# start_bar_to_update = index - bars_added
self.draw_lines(just_history=True)
if just_history:
self.update()
return
# current bar update
# last bar update
i, o, h, l, last, v = array[-1][
['index', 'open', 'high', 'low', 'close', 'volume']
]
assert i == self.index - 1
body, larm, rarm = self.lines[i]
# assert i == self.start_index - 1
assert i == last_index
body, larm, rarm = self._last_bar_lines
# XXX: is there a faster way to modify this?
rarm.setLine(rarm.x1(), last, rarm.x2(), last)
@ -579,16 +680,26 @@ class BarItems(pg.GraphicsObject):
if l != h: # noqa
if body is None:
body = self.lines[index - 1][0] = QLineF(i, l, i, h)
body = self._last_bar_lines[0] = QLineF(i, l, i, h)
else:
# update body
body.setLine(i, l, i, h)
else:
# XXX: h == l -> remove any HL line to avoid render bug
if body is not None:
body = self.lines[index - 1][0] = None
self.draw_lines(just_history=False)
# XXX: pretty sure this is causing an issue where the bar has
# a large upward move right before the next sample and the body
# is getting set to None since the next bar is flat but the shm
# array index update wasn't read by the time this code runs. Iow
# we're doing this removal of the body for a bar index that is
# now out of date / from some previous sample. It's weird
# though because i've seen it do this to bars i - 3 back?
# else:
# # XXX: h == l -> remove any HL line to avoid render bug
# if body is not None:
# body = self.lines[index - 1][0] = None
self.draw_last_bar()
self.update()
# @timeit
def paint(self, p, opt, widget):
@ -606,33 +717,36 @@ class BarItems(pg.GraphicsObject):
# as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars
# in view.
p.drawPicture(0, 0, self.history)
p.drawPicture(0, 0, self.last)
p.drawPicture(0, 0, self.last_bar)
# TODO: if we can ever make pixmaps work...
# p.drawPixmap(0, 0, self.picture)
# self._pmi.setPixmap(self.picture)
# print(self.scene())
# profiler('bars redraw:')
p.setPen(self.bars_pen)
p.drawPath(self.path)
# @timeit
def boundingRect(self):
# TODO: can we do rect caching to make this faster?
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
# TODO: Can we do rect caching to make this faster
# like `pg.PlotCurveItem` does? In theory it's just
# computing max/min stuff again like we do in the udpate loop
# anyway. Not really sure it's necessary since profiling already
# shows this method is faf.
# boundingRect _must_ indicate the entire area that will be
# drawn on or else we will get artifacts and possibly crashing.
# (in this case, QPicture does all the work of computing the
# bounding rect for us).
# compute aggregate bounding rectangle
lb = self.last.boundingRect()
hb = self.history.boundingRect()
lb = self.last_bar.boundingRect()
hb = self.path.boundingRect()
return QtCore.QRectF(
# top left
QtCore.QPointF(hb.topLeft()),
# total size
QtCore.QSizeF(lb.size() + hb.size())
QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size())
# QtCore.QSizeF(lb.size() + hb.size())
)
@ -785,7 +899,7 @@ class L1Labels:
chart: 'ChartPlotWidget', # noqa
digits: int = 2,
size_digits: int = 0,
font_size_inches: float = 4 / 53.,
font_size_inches: float = _down_2_font_inches_we_like,
) -> None:
self.chart = chart
@ -839,7 +953,9 @@ def level_line(
digits: int = 1,
# size 4 font on 4k screen scaled down, so small-ish.
font_size_inches: float = 4 / 53.,
font_size_inches: float = _down_2_font_inches_we_like,
show_label: bool = True,
**linelabelkwargs
) -> LevelLine:
@ -859,6 +975,7 @@ def level_line(
**linelabelkwargs
)
label.update_from_data(0, level)
# TODO: can we somehow figure out a max value from the parent axis?
label._size_br_from_str(label.label_str)
@ -874,4 +991,7 @@ def level_line(
chart.plotItem.addItem(line)
if not show_label:
label.hide()
return line

View File

@ -18,6 +18,7 @@
Qt UI styling.
"""
from typing import Optional
import math
import pyqtgraph as pg
from PyQt5 import QtCore, QtGui
@ -27,10 +28,9 @@ from ..log import get_logger
log = get_logger(__name__)
# chart-wide font
# font size 6px / 53 dpi (3x scaled down on 4k hidpi)
_default_font_inches_we_like = 6 / 53 # px / (px / inch) = inch
_down_2_font_inches_we_like = 4 / 53
# chart-wide fonts specified in inches
_default_font_inches_we_like = 6 / 96
_down_2_font_inches_we_like = 5 / 96
class DpiAwareFont:
@ -66,8 +66,12 @@ class DpiAwareFont:
listed in the script in ``snippets/qt_screen_info.py``.
"""
dpi = screen.physicalDotsPerInch()
font_size = round(self._iwl * dpi)
# take the max since scaling can make things ugly in some cases
pdpi = screen.physicalDotsPerInch()
ldpi = screen.logicalDotsPerInch()
dpi = max(pdpi, ldpi)
font_size = math.floor(self._iwl * dpi)
log.info(
f"\nscreen:{screen.name()} with DPI: {dpi}"
f"\nbest font size is {font_size}\n"