Compare commits

...

32 Commits

Author SHA1 Message Date
Tyler Goodlet d01ca0bf96 Use .shield_channel() meth name from tractor 2020-12-17 10:53:30 -05:00
Tyler Goodlet 82c99c5fee Drop profile calls on OHLC bars for now 2020-12-17 10:53:30 -05:00
Tyler Goodlet cd0c75fe40 Add signal backfilling via trio task respawn 2020-12-17 10:53:30 -05:00
Tyler Goodlet 47959c6a2b Drop legacy "historical" QPicture cruft 2020-12-17 10:53:30 -05:00
Tyler Goodlet 873a8d3f3e More general salutation 2020-12-17 10:53:30 -05:00
Tyler Goodlet 2f36b58fbd Stick with slightly smaller fonts 2020-12-17 10:53:30 -05:00
Tyler Goodlet 642d38439d Fix axes for shm primary indexing 2020-12-17 10:53:30 -05:00
Tyler Goodlet f7f2857fe6 Port charting to new shm primary indexing 2020-12-17 10:53:30 -05:00
Tyler Goodlet 05a47c25f4 Close app on last window exit
Use a system triggered SIGINT on app close to tear down the streaming
stack and terminate the `trio`/`tractor` runtimes deterministically.
2020-12-17 10:53:30 -05:00
Tyler Goodlet 18d41d0d24 Port kraken to declare "wap" field 2020-12-17 10:53:30 -05:00
Tyler Goodlet 098db15b2d Port data apis to not touch primary index 2020-12-17 10:53:30 -05:00
Tyler Goodlet 6bae50ba2e Add historical backfilling to ib backend 2020-12-17 10:53:30 -05:00
Tyler Goodlet 3ee4fe7d56 First draft, make graphics work on shm primary index
This is a bit hacky (what with array indexing semantics being relative
to the primary index's "start" value but it works. We'll likely want
to somehow wrap this index finagling into an API soon.
2020-12-17 10:52:18 -05:00
Tyler Goodlet ec0be781f8 Add prepend support to shm system 2020-12-17 10:52:18 -05:00
Tyler Goodlet 3a70f4907a Left align yaxis label 2020-12-17 10:52:18 -05:00
Tyler Goodlet 74315d4c89 Font size tweaks for low dpi 2020-12-17 10:52:18 -05:00
Tyler Goodlet 1d7bd3f748 Use `numpy.divide()` to avoid divide-by-zero 2020-12-17 10:52:18 -05:00
Tyler Goodlet 2a933c3808 Tidy up doc string 2020-12-17 10:52:18 -05:00
Tyler Goodlet 9557292573 Attempt to add numba typing and use `QGraphicsPathItem`
Failed at using either.

Quirks in numba's typing require specifying readonly arrays by
composing types manually.

The graphics item path thing, while it does take less time to write on
bar appends, seems to be slower in general in calculating the
``.boundingRect()`` value. Likely we'll just add manual max/min tracking
on array updates like ``pg.PlotCurveItem`` to squeeze some final juices
on this.
2020-12-17 10:52:18 -05:00
Tyler Goodlet 2b4875957f Drop commented pixmap cruft
See #124 as to why we'll probably never need this.
2020-12-17 10:52:18 -05:00
Tyler Goodlet e36f675eee Get `QPainterPath` "append" working
Pertains further to #109.

Instead of redrawing the entire `QPainterPath` every time there's
a historical bars update just use `.addPath()` to slap in latest
history. It seems to work and is fast. This also seems like it will be
a great strategy for filling in earlier data, woot!
2020-12-17 10:52:18 -05:00
Tyler Goodlet bcce0b5df3 Draw bars using `QPainterPath` magic
This gives a massive speedup when viewing large bar sets (think a day's
worth of 5s bars) by using the `pg.functions.arrayToQPath()` "magic"
binary array writing that is also used in `PlotCurveItem`.  We're using
this same (lower level) function directly to draw bars as part of one
large path and it seems to be painting 15k (ish) bars with around 3ms
`.paint()` latency. The only thing still a bit slow is the path array
generation despite doing it with `numba`. Likely, either having multiple
paths or, only regenerating the missing backing array elements should
speed this up further to avoid slight delays when incrementing the bar
step.

This is of course a first draft and more cleanups are coming.
2020-12-17 10:52:18 -05:00
Tyler Goodlet 4fe8c4487a Add field diffing on failed push 2020-12-17 10:52:18 -05:00
Tyler Goodlet 757f02e6f7 Tweak axis text offset and margins 2020-12-17 10:52:18 -05:00
Tyler Goodlet 80845024da Use new global var stack from tractor 2020-12-17 10:52:18 -05:00
Tyler Goodlet a5f622506d Kill the tractor tree on window close.
This makes it so you don't have to ctrl-c kill apps.
Add in the experimental openGL support even though I'm pretty sure it's
not being used much for curve plotting (but could be wrong).
2020-12-17 10:52:18 -05:00
Tyler Goodlet c183a428ef Add commented ex. code for line price charts 2020-12-17 10:52:18 -05:00
Tyler Goodlet afa15c4616 Allocate space for 2d worth of 5s bars 2020-12-17 10:52:18 -05:00
Tyler Goodlet 4e8739d9ed Break hist calc into wap func, use hlc3. 2020-12-17 10:52:18 -05:00
Tyler Goodlet 0ab0957c6e Put fsp plotting into a couple tasks, startup speedups.
Break the chart update code for fsps into a new task (add a nursery) in
new `spawn_fsps` (was `chart_from_fsps`) that async requests actor
spawning and initial historical data (all CPU bound work).  For multiple
fsp subcharts this allows processing initial output in parallel
(multi-core). We might want to wrap this in a "feed" like api
eventually. Basically the fsp startup sequence is now:
- start all requested fsp actors in an async loop and wait for
  historical data to arrive
- loop through them all again to start update tasks which do chart
  graphics rendering

Add separate x-axis objects for each new subchart (required by
pyqtgraph); still need to fix hiding unnecessary ones.
Add a `ChartPlotWidget._arrays: dict` for holding overlay data distinct
from ohlc. Drop the sizing yrange to label heights for now since it's
pretty much all gone to hell since adding L1 labels. Fix y-stickies to
look up correct overly arrays.
2020-12-17 10:52:18 -05:00
Tyler Goodlet e76eb790f1 Add vwap to exposed fsp map 2020-12-17 10:52:18 -05:00
Tyler Goodlet 6d514a7d5a Add initial tina (ohl3) vwap fsp 2020-12-17 10:52:18 -05:00
15 changed files with 1378 additions and 592 deletions

View File

@ -21,9 +21,10 @@ Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is
built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
from contextlib import asynccontextmanager, contextmanager
from contextlib import asynccontextmanager
from dataclasses import asdict
from functools import partial
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio
import logging
@ -32,6 +33,7 @@ import itertools
import time
from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker
import ib_insync as ibis
@ -45,7 +47,7 @@ from ..data import (
maybe_spawn_brokerd,
iterticks,
attach_shm_array,
get_shm_token,
# get_shm_token,
subscribe_ohlc_for_increment,
)
from ..data._source import from_df
@ -86,6 +88,8 @@ _time_frames = {
'Y': 'OneYear',
}
_show_wap_in_history = False
# overrides to sidestep pretty questionable design decisions in
# ``ib_insync``:
@ -128,6 +132,8 @@ _adhoc_cmdty_data_map = {
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}
_enters = 0
class Client:
"""IB wrapped for our broker backend API.
@ -142,32 +148,54 @@ class Client:
self.ib = ib
self.ib.RaiseRequestErrors = True
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def bars(
self,
symbol: str,
# EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m',
count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False,
start_dt: str = "1970-01-01T00:00:00.000000-05:00",
end_dt: str = "",
sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query
is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
"""
bars_kwargs = {'whatToShow': 'TRADES'}
global _enters
print(f'ENTER BARS {_enters}')
_enters += 1
contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))
# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime='',
# durationStr='60 S',
# durationStr='1 D',
endDateTime=end_dt,
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',
# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',
durationStr='{count} S'.format(count=period_count),
barSizeSetting='1 secs',
# barSizeSetting='1 min',
# time length calcs
durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs',
# always use extended hours
useRTH=False,
@ -181,9 +209,13 @@ class Client:
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?")
# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe:
df = ibis.util.df(bars)
return from_df(df)
return bars, from_df(df)
def onError(self, reqId, errorCode, errorString, contract) -> None:
breakpoint()
async def search_stocks(
self,
@ -237,6 +269,8 @@ class Client:
"""Get an unqualifed contract for the current "continous" future.
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)
# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId)
@ -279,10 +313,10 @@ class Client:
if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD'
if exch in ('PURE',):
if exch in ('PURE', 'TSE'):
# stupid ib...
primaryExchange = exch
exch = 'SMART'
primaryExchange = 'PURE'
con = ibis.Stock(
symbol=sym,
@ -293,10 +327,27 @@ class Client:
try:
exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0]
head = await self.get_head_time(contract)
print(head)
except IndexError:
raise ValueError(f"No contract could be found {con}")
return contract
async def get_head_time(
self,
contract: Contract,
) -> datetime:
"""Return the first datetime stamp for ``contract``.
"""
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
useRTH=False,
formatDate=2, # timezone aware UTC datetime
)
async def stream_ticker(
self,
symbol: str,
@ -309,7 +360,13 @@ class Client:
contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))
# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.
def push(t):
"""Push quotes to trio task.
"""
# log.debug(t)
try:
to_trio.send_nowait(t)
@ -346,9 +403,17 @@ async def _aio_get_client(
"""
# first check cache for existing client
# breakpoint()
try:
yield _client_cache[(host, port)]
except KeyError:
if port:
client = _client_cache[(host, port)]
else:
# grab first cached client
client = list(_client_cache.values())[0]
yield client
except (KeyError, IndexError):
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.
@ -359,9 +424,11 @@ async def _aio_get_client(
ib = NonShittyIB()
ports = _try_ports if port is None else [port]
_err = None
for port in ports:
try:
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id)
break
except ConnectionRefusedError as ce:
@ -373,6 +440,7 @@ async def _aio_get_client(
try:
client = Client(ib)
_client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client
except BaseException:
ib.disconnect()
@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None,
**kwargs,
) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client:
async_meth = getattr(client, meth)
@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str,
**kwargs,
) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.
"""
ca = tractor.current_actor()
assert ca.is_infected_aio()
@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {}
@contextmanager
def activate_writer(key: str):
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)
if not writer_already_exists:
_local_buffer_writers[key] = True
yield writer_already_exists
async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)
async def fill_bars(
first_bars,
shm,
count: int = 21,
) -> None:
"""Fill historical bars into shared mem / storage afap.
TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128
"""
next_dt = first_bars[0].date
i = 0
while i < count:
try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol='.'.join(
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt,
)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date
except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error~~
if err.code == 162:
log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS")
await tractor.breakpoint()
# TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api.
# @tractor.msg.pub
@ -575,7 +687,9 @@ async def stream_quotes(
# check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing
with activate_writer(shm_token['shm_name']) as writer_already_exists:
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):
# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer
readonly=False,
)
bars = await _trio_run_client_method(
# async def retrieve_and_push():
start = time.time()
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,
)
if bars is None:
log.info(f"bars_array request: {time.time() - start}")
if bars_array is None:
raise SymbolNotFound(sym)
# write historical data to buffer
shm.push(bars)
shm.push(bars_array)
shm_token = shm.token
# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, bars, shm)
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
@ -656,6 +781,7 @@ async def stream_quotes(
# real-time stream
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']
# print(f"{quote['symbol']}: {tick}")
# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price
o = last
shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = (
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),

View File

@ -57,13 +57,15 @@ _ohlc_dtype = [
('close', float),
('volume', float),
('count', int),
('vwap', float),
('bar_wap', float),
]
# UI components allow this to be declared such that additional
# (historical) fields can be exposed.
ohlc_dtype = np.dtype(_ohlc_dtype)
_show_wap_in_history = True
class Client:
@ -341,7 +343,7 @@ async def stream_quotes(
while True:
try:
async with trio_websocket.open_websocket_url(
'wss://ws.kraken.com',
'wss://ws.kraken.com/',
) as ws:
# XXX: setup subs
@ -433,7 +435,7 @@ async def stream_quotes(
'high',
'low',
'close',
'vwap',
'bar_wap', # in this case vwap of bar
'volume']
][-1] = (
o,

View File

@ -42,7 +42,7 @@ from ._sharedmem import (
ShmArray,
get_shm_token,
)
from ._source import base_ohlc_dtype
from ._source import base_iohlc_dtype
from ._buffer import (
increment_ohlc_buffer,
subscribe_ohlc_for_increment
@ -139,6 +139,7 @@ class Feed:
name: str
stream: AsyncIterator[Dict[str, Any]]
shm: ShmArray
# ticks: ShmArray
_broker_portal: tractor._portal.Portal
_index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None
@ -188,7 +189,7 @@ async def open_feed(
key=sym_to_shm_key(name, symbols[0]),
# use any broker defined ohlc dtype:
dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype),
dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype),
# we expect the sub-actor to write
readonly=True,

View File

@ -91,19 +91,20 @@ async def increment_ohlc_buffer(
# append new entry to buffer thus "incrementing" the bar
array = shm.array
last = array[-1:].copy()
(index, t, close) = last[0][['index', 'time', 'close']]
last = array[-1:][shm._write_fields].copy()
# (index, t, close) = last[0][['index', 'time', 'close']]
(t, close) = last[0][['time', 'close']]
# this copies non-std fields (eg. vwap) from the last datum
last[
['index', 'time', 'volume', 'open', 'high', 'low', 'close']
][0] = (index + 1, t + delay_s, 0, close, close, close, close)
['time', 'volume', 'open', 'high', 'low', 'close']
][0] = (t + delay_s, 0, close, close, close, close)
# write to the buffer
shm.push(last)
# broadcast the buffer index step
yield {'index': shm._i.value}
yield {'index': shm._last.value}
def subscribe_ohlc_for_increment(

View File

@ -33,6 +33,6 @@ def iterticks(
ticks = quote.get('ticks', ())
if ticks:
for tick in ticks:
print(f"{quote['symbol']}: {tick}")
# print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types:
yield tick

View File

@ -17,11 +17,10 @@
"""
NumPy compatible shared memory buffers for real-time FSP.
"""
from typing import List
from dataclasses import dataclass, asdict
from sys import byteorder
from typing import Tuple, Optional
from multiprocessing import shared_memory
from typing import List, Tuple, Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker
from _posixshmem import shm_unlink
@ -29,7 +28,7 @@ import tractor
import numpy as np
from ..log import get_logger
from ._source import base_ohlc_dtype
from ._source import base_ohlc_dtype, base_iohlc_dtype
log = get_logger(__name__)
@ -58,17 +57,15 @@ mantracker.getfd = mantracker._resource_tracker.getfd
class SharedInt:
"""Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
"""
def __init__(
self,
token: str,
create: bool = False,
shm: SharedMemory,
) -> None:
# create a single entry array for storing an index counter
self._shm = shared_memory.SharedMemory(
name=token,
create=create,
size=4, # std int
)
self._shm = shm
@property
def value(self) -> int:
@ -79,7 +76,7 @@ class SharedInt:
self._shm.buf[:] = value.to_bytes(4, byteorder)
def destroy(self) -> None:
if shared_memory._USE_POSIX:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
@ -91,7 +88,8 @@ class _Token:
which can be used to key a system wide post shm entry.
"""
shm_name: str # this servers as a "key" value
shm_counter_name: str
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: List[Tuple[str]]
def __post_init__(self):
@ -130,27 +128,47 @@ def _make_token(
"""Create a serializable token that can be used
to access a shared array.
"""
dtype = base_ohlc_dtype if dtype is None else dtype
dtype = base_iohlc_dtype if dtype is None else dtype
return _Token(
key,
key + "_counter",
key + "_first",
key + "_last",
np.dtype(dtype).descr
)
class ShmArray:
"""A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
"""
def __init__(
self,
shmarr: np.ndarray,
counter: SharedInt,
shm: shared_memory.SharedMemory,
readonly: bool = True,
first: SharedInt,
last: SharedInt,
shm: SharedMemory,
# readonly: bool = True,
) -> None:
self._array = shmarr
self._i = counter
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr)
self._shm = shm
self._readonly = readonly
# pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
# TODO: ringbuf api?
@ -158,24 +176,25 @@ class ShmArray:
def _token(self) -> _Token:
return _Token(
self._shm.name,
self._i._shm.name,
self._first._shm.name,
self._last._shm.name,
self._array.dtype.descr,
)
@property
def token(self) -> dict:
"""Shared memory token that can be serialized
and used by another process to attach to this array.
"""Shared memory token that can be serialized and used by
another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
return self._i.value % self._len
return self._last.value % self._len
@property
def array(self) -> np.ndarray:
return self._array[:self._i.value]
return self._array[self._first.value:self._last.value]
def last(
self,
@ -186,38 +205,90 @@ class ShmArray:
def push(
self,
data: np.ndarray,
prepend: bool = False,
) -> int:
"""Ring buffer like "push" to append data
into the buffer and return updated index.
into the buffer and return updated "last" index.
"""
length = len(data)
# TODO: use .index for actual ring logic?
index = self._i.value
if prepend:
index = self._first.value - length
else:
index = self._last.value
end = index + length
self._array[index:end] = data[:]
self._i.value = end
return end
fields = self._write_fields
try:
self._array[fields][index:end] = data[fields][:]
if prepend:
self._first.value = index
else:
self._last.value = end
return end
except ValueError as err:
# shoudl raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None:
self._i._shm.close()
self._first._shm.close()
self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
if shared_memory._USE_POSIX:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
self._i.destroy()
self._first.destroy()
self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 12)
_default_size = 2 * _secs_in_day
def open_shm_array(
key: Optional[str] = None,
# approx number of 5s bars in a "day" x2
size: int = int(2*60*60*10/5),
size: int = _default_size,
dtype: Optional[np.dtype] = None,
readonly: bool = False,
) -> ShmArray:
@ -229,7 +300,9 @@ def open_shm_array(
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
shm = shared_memory.SharedMemory(
a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
@ -243,17 +316,30 @@ def open_shm_array(
dtype=dtype
)
counter = SharedInt(
token=token.shm_counter_name,
create=True,
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
)
counter.value = 0
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
last.value = first.value = int(_secs_in_day)
shmarr = ShmArray(
array,
counter,
first,
last,
shm,
readonly=readonly,
)
assert shmarr._token == token
@ -261,26 +347,31 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
actor = tractor.current_actor()
actor._lifetime_stack.callback(shmarr.close)
actor._lifetime_stack.callback(shmarr.destroy)
tractor._actor._lifetime_stack.callback(shmarr.close)
tractor._actor._lifetime_stack.callback(shmarr.destroy)
return shmarr
def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]],
size: int = int(60*60*10/5),
size: int = _default_size,
readonly: bool = True,
) -> ShmArray:
"""Load and attach to an existing shared memory array previously
"""Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
"""
token = _Token.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert _known_tokens[key] == token, "WTF"
shm = shared_memory.SharedMemory(name=key)
# attach to array buffer and view as per dtype
shm = SharedMemory(name=key)
shmarr = np.ndarray(
(size,),
dtype=token.dtype_descr,
@ -288,15 +379,29 @@ def attach_shm_array(
)
shmarr.setflags(write=int(not readonly))
counter = SharedInt(token=token.shm_counter_name)
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read
counter.value
first.value
sha = ShmArray(
shmarr,
counter,
first,
last,
shm,
readonly=readonly,
)
# read test
sha.array
@ -308,8 +413,8 @@ def attach_shm_array(
_known_tokens[key] = token
# "close" attached shm on process teardown
actor = tractor.current_actor()
actor._lifetime_stack.callback(sha.close)
tractor._actor._lifetime_stack.callback(sha.close)
return sha
@ -318,21 +423,20 @@ def maybe_open_shm_array(
dtype: Optional[np.dtype] = None,
**kwargs,
) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block by a
"key" determined by the users overall "system"
"""Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registryt
(presumes you don't have the block's explicit token).
This function is meant to solve the problem of
discovering whether a shared array token has been
allocated or discovered by the actor running in
**this** process. Systems where multiple actors
may seek to access a common block can use this
function to attempt to acquire a token as discovered
by the actors who have previously stored a
"key" -> ``_Token`` map in an actor local variable.
This function is meant to solve the problem of discovering whether
a shared array token has been allocated or discovered by the actor
running in **this** process. Systems where multiple actors may seek
to access a common block can use this function to attempt to acquire
a token as discovered by the actors who have previously stored
a "key" -> ``_Token`` map in an actor local (aka python global)
variable.
If you know the explicit ``_Token`` for your memory
instead use ``attach_shm_array``.
If you know the explicit ``_Token`` for your memory segment instead
use ``attach_shm_array``.
"""
try:
# see if we already know this key

View File

@ -15,27 +15,36 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Numpy data source machinery.
numpy data source coversion helpers.
"""
import decimal
from dataclasses import dataclass
import numpy as np
import pandas as pd
# from numba import from_dtype
ohlc_fields = [
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
('bar_wap', float),
]
ohlc_with_index = ohlc_fields.copy()
ohlc_with_index.insert(0, ('index', int))
# our minimum structured array layout for ohlc data
base_ohlc_dtype = np.dtype(
[
('index', int),
('time', float),
('open', float),
('high', float),
('low', float),
('close', float),
('volume', int),
]
)
base_iohlc_dtype = np.dtype(ohlc_with_index)
base_ohlc_dtype = np.dtype(ohlc_fields)
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# numba_ohlc_dtype = from_dtype(base_ohlc_dtype)
# map time frame "keys" to minutes values
tf_in_1m = {
@ -110,18 +119,27 @@ def from_df(
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
# most feeds are providing this over sesssion anchored
'vwap': 'bar_wap',
# XXX: ib_insync calls this the "wap of the bar"
# but no clue what is actually is...
# https://github.com/pikers/piker/issues/119#issuecomment-729120988
'average': 'bar_wap',
}
df = df.rename(columns=columns)
for name in df.columns:
if name not in base_ohlc_dtype.names[1:]:
# if name not in base_ohlc_dtype.names[1:]:
if name not in base_ohlc_dtype.names:
del df[name]
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
array = df.to_records()
array = df.to_records(index=False)
_nan_to_closest_num(array)
return array

View File

@ -20,18 +20,24 @@ Financial signal processing for the peeps.
from typing import AsyncIterator, Callable, Tuple
import trio
from trio_typing import TaskStatus
import tractor
import numpy as np
from ..log import get_logger
from .. import data
from ._momo import _rsi
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array, Feed
log = get_logger(__name__)
_fsps = {'rsi': _rsi}
_fsps = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
}
async def latency(
@ -70,7 +76,7 @@ async def increment_signals(
# write new slot to the buffer
dst_shm.push(last)
len(dst_shm.array)
@tractor.stream
@ -95,66 +101,107 @@ async def cascade(
async with data.open_feed(brokername, [symbol]) as feed:
assert src.token == feed.shm.token
# TODO: load appropriate fsp with input args
async def filter_by_sym(sym, stream):
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
async def fsp_compute(
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# TODO: load appropriate fsp with input args
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
#
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for derivatives.
async def filter_by_sym(
sym: str,
stream,
):
# task cancellation won't kill the channel
with stream.shield_channel():
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
out_stream = func(
filter_by_sym(symbol, feed.stream),
feed.shm,
)
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# TODO: talk to ``pyqtgraph`` core about proper way to solve this:
# XXX: hack to get curves aligned with bars graphics: prepend
# a copy of the first datum..
# dst.push(history[:1])
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFFZZZ {diff}")
for _ in range(diff):
dst.push(history[:1])
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# compare with source signal and time align
index = dst.push(history)
yield index
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
print(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
last_len = new_len = len(src.array)
async with trio.open_nursery() as n:
n.start_soon(increment_signals, feed, dst)
async for processed in out_stream:
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
await ctx.send_yield(index)
cs = await n.start(fsp_compute)
# Increment the underlying shared memory buffer on every "increment"
# msg received from the underlying data feed.
async for msg in await feed.index_stream():
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_compute)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
array = dst.array
last = array[-1:].copy()
# write new slot to the buffer
dst.push(last)
last_len = new_len

View File

@ -151,8 +151,8 @@ def wma(
return np.convolve(signal, weights, 'valid')
# @piker.fsp(
# aggregates=[60, 60*5, 60*60, '4H', '1D'],
# @piker.fsp.signal(
# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa

View File

@ -0,0 +1,93 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import AsyncIterator, Optional
import numpy as np
from ..data._normalize import iterticks
def wap(
signal: np.ndarray,
weights: np.ndarray,
) -> np.ndarray:
"""Weighted average price from signal and weights.
"""
cum_weights = np.cumsum(weights)
cum_weighted_input = np.cumsum(signal * weights)
# cum_weighted_input / cum_weights
# but, avoid divide by zero errors
avg = np.divide(
cum_weighted_input,
cum_weights,
where=cum_weights != 0
)
return (
avg,
cum_weighted_input,
cum_weights,
)
async def _tina_vwap(
source, #: AsyncStream[np.ndarray],
ohlcv: np.ndarray, # price time-frame "aware"
anchors: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming volume weighted moving average.
Calling this "tina" for now since we're using HLC3 instead of tick.
"""
if anchors is None:
# TODO:
# anchor to session start of data if possible
pass
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
h_vwap, cum_wp, cum_v = wap(chl3, v)
# deliver historical output as "first yield"
yield h_vwap
w_tot = cum_wp[-1]
v_tot = cum_v[-1]
# vwap_tot = h_vwap[-1]
async for quote in source:
for tick in iterticks(quote, types=['trade']):
# c, h, l, v = ohlcv.array[-1][
# ['closes', 'high', 'low', 'volume']
# ]
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
v_tot += size
w_tot += price * size
# yield ((((o + h + l) / 3) * v) weights_tot) / v_tot
yield w_tot / v_tot

View File

@ -38,7 +38,7 @@ class Axis(pg.AxisItem):
def __init__(
self,
linked_charts,
typical_max_str: str = '100 000.00',
typical_max_str: str = '100 000.000',
min_tick: int = 2,
**kwargs
) -> None:
@ -51,6 +51,8 @@ class Axis(pg.AxisItem):
self.setStyle(**{
'textFillLimits': [(0, 0.666)],
'tickFont': _font.font,
# offset of text *away from* axis line in px
'tickTextOffset': 2,
})
self.setTickFont(_font.font)
@ -88,11 +90,10 @@ class PriceAxis(Axis):
# print(f'digits: {digits}')
return [
('{value:,.{digits}f}')
.format(
digits=digits,
value=v,
).replace(',', ' ') for v in vals
('{value:,.{digits}f}').format(
digits=digits,
value=v,
).replace(',', ' ') for v in vals
]
@ -104,23 +105,36 @@ class DynamicDateAxis(Axis):
60: '%H:%M',
30: '%H:%M:%S',
5: '%H:%M:%S',
1: '%H:%M:%S',
}
def resize(self) -> None:
self.setHeight(self.typical_br.height() + 3)
self.setHeight(self.typical_br.height() + 1)
def _indexes_to_timestrs(
self,
indexes: List[int],
) -> List[str]:
bars = self.linked_charts.chart._array
# try:
chart = self.linked_charts.chart
bars = chart._ohlc
shm = self.linked_charts.chart._shm
first = shm._first.value
bars_len = len(bars)
times = bars['time']
epochs = times[list(
map(int, filter(lambda i: i < bars_len, indexes))
map(
int,
filter(
lambda i: i > 0 and i < bars_len,
(i-first for i in indexes)
)
)
)]
# TODO: **don't** have this hard coded shift to EST
dts = pd.to_datetime(epochs, unit='s') # - 4*pd.offsets.Hour()
@ -228,6 +242,7 @@ class AxisLabel(pg.GraphicsObject):
class XAxisLabel(AxisLabel):
_w_margin = 4
text_flags = (
QtCore.Qt.TextDontClip
@ -255,18 +270,17 @@ class XAxisLabel(AxisLabel):
w = self.boundingRect().width()
self.setPos(QPointF(
abs_pos.x() - w / 2 - offset,
0,
1,
))
self.update()
class YAxisLabel(AxisLabel):
_h_margin = 3
# _w_margin = 1
_h_margin = 2
text_flags = (
# QtCore.Qt.AlignLeft
QtCore.Qt.AlignHCenter
QtCore.Qt.AlignLeft
# QtCore.Qt.AlignHCenter
| QtCore.Qt.AlignVCenter
| QtCore.Qt.TextDontClip
)
@ -283,13 +297,13 @@ class YAxisLabel(AxisLabel):
) -> None:
# this is read inside ``.paint()``
self.label_str = '{value:,.{digits}f}'.format(
self.label_str = ' {value:,.{digits}f}'.format(
digits=self.digits, value=value).replace(',', ' ')
br = self.boundingRect()
h = br.height()
self.setPos(QPointF(
0,
1,
abs_pos.y() - h / 2 - offset
))
self.update()

File diff suppressed because it is too large Load Diff

View File

@ -20,12 +20,15 @@ Trio - Qt integration
Run ``trio`` in guest mode on top of the Qt event loop.
All global Qt runtime settings are mostly defined here.
"""
import os
import signal
from functools import partial
import traceback
from typing import Tuple, Callable, Dict, Any
# Qt specific
import PyQt5 # noqa
import pyqtgraph as pg
from pyqtgraph import QtGui
from PyQt5 import QtCore
from PyQt5.QtCore import (
@ -37,6 +40,12 @@ import tractor
from outcome import Error
# pyqtgraph global config
# might as well enable this for now?
pg.useOpenGL = True
pg.enableExperimental = True
# singleton app per actor
_qt_app: QtGui.QApplication = None
_qt_win: QtGui.QMainWindow = None
@ -67,6 +76,17 @@ class MainWindow(QtGui.QMainWindow):
self.setMinimumSize(*self.size)
self.setWindowTitle(self.title)
def closeEvent(
self,
event: 'QCloseEvent'
) -> None:
"""Cancel the root actor asap.
"""
# raising KBI seems to get intercepted by by Qt so just use the
# system.
os.kill(os.getpid(), signal.SIGINT)
def run_qtractor(
func: Callable,
@ -115,11 +135,15 @@ def run_qtractor(
def done_callback(outcome):
print(f"Outcome: {outcome}")
if isinstance(outcome, Error):
exc = outcome.error
traceback.print_exception(type(exc), exc, exc.__traceback__)
if isinstance(outcome.error, KeyboardInterrupt):
# make it kinda look like ``trio``
print("Terminated!")
else:
traceback.print_exception(type(exc), exc, exc.__traceback__)
app.quit()
@ -154,6 +178,7 @@ def run_qtractor(
main,
run_sync_soon_threadsafe=run_sync_soon_threadsafe,
done_callback=done_callback,
# restrict_keyboard_interrupt_to_checkpoints=True,
)
window.main_widget = main_widget

View File

@ -17,39 +17,44 @@
"""
Chart graphics for displaying a slew of different data types.
"""
# import time
import inspect
from typing import List, Optional, Tuple
import numpy as np
import pyqtgraph as pg
# from numba import jit, float64, optional, int64
from numba import jit, float64, int64 # , optional
# from numba import types as ntypes
from PyQt5 import QtCore, QtGui
from PyQt5.QtCore import QLineF, QPointF
# from .._profile import timeit
from .._profile import timeit
# from ..data._source import numba_ohlc_dtype
from ._style import (
_xaxis_at,
hcolor,
_font,
_down_2_font_inches_we_like,
)
from ._axes import YAxisLabel, XAxisLabel, YSticky
# XXX: these settings seem to result in really decent mouse scroll
# latency (in terms of perceived lag in cross hair) so really be sure
# there's an improvement if you want to change it.
_mouse_rate_limit = 60 # calc current screen refresh rate?
# there's an improvement if you want to change it!
_mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 2e3
_ch_label_opac = 1
# TODO: we need to handle the case where index is outside
# the underlying datums range
class LineDot(pg.CurvePoint):
def __init__(
self,
curve: pg.PlotCurveItem,
index: int,
plot: 'ChartPlotWidget',
pos=None,
size: int = 2, # in pxs
color: str = 'default_light',
@ -61,6 +66,7 @@ class LineDot(pg.CurvePoint):
pos=pos,
rotate=False,
)
self._plot = plot
# TODO: get pen from curve if not defined?
cdefault = hcolor(color)
@ -80,6 +86,31 @@ class LineDot(pg.CurvePoint):
# keep a static size
self.setFlag(self.ItemIgnoresTransformations)
def event(
self,
ev: QtCore.QEvent,
) -> None:
# print((ev, type(ev)))
if not isinstance(ev, QtCore.QDynamicPropertyChangeEvent) or self.curve() is None:
return False
# if ev.propertyName() == 'index':
# print(ev)
# # self.setProperty
(x, y) = self.curve().getData()
index = self.property('index')
# first = self._plot._ohlc[0]['index']
# first = x[0]
# i = index - first
i = index - x[0]
if i > 0 and i < len(y):
newPos = (index, y[i])
QtGui.QGraphicsItem.setPos(self, *newPos)
return True
return False
_corner_anchors = {
'top': 0,
@ -91,8 +122,9 @@ _corner_anchors = {
_corner_margins = {
('top', 'left'): (-4, -5),
('top', 'right'): (4, -5),
('bottom', 'left'): (-4, 5),
('bottom', 'right'): (4, 5),
('bottom', 'left'): (-4, lambda font_size: font_size * 2),
('bottom', 'right'): (4, lambda font_size: font_size * 2),
}
@ -109,7 +141,10 @@ class ContentsLabel(pg.LabelItem):
font_size: Optional[int] = None,
) -> None:
font_size = font_size or _font.font.pixelSize()
super().__init__(justify=justify_text, size=f'{str(font_size)}px')
super().__init__(
justify=justify_text,
size=f'{str(font_size)}px'
)
# anchor to viewbox
self.setParentItem(chart._vb)
@ -120,6 +155,10 @@ class ContentsLabel(pg.LabelItem):
index = (_corner_anchors[h], _corner_anchors[v])
margins = _corner_margins[(v, h)]
ydim = margins[1]
if inspect.isfunction(margins[1]):
margins = margins[0], ydim(font_size)
self.anchor(itemPos=index, parentPos=index, offset=margins)
def update_from_ohlc(
@ -129,15 +168,19 @@ class ContentsLabel(pg.LabelItem):
array: np.ndarray,
) -> None:
# this being "html" is the dumbest shit :eyeroll:
first = array[0]['index']
self.setText(
"<b>i</b>:{index}<br/>"
"<b>O</b>:{}<br/>"
"<b>H</b>:{}<br/>"
"<b>L</b>:{}<br/>"
"<b>C</b>:{}<br/>"
"<b>V</b>:{}".format(
# *self._array[index].item()[2:8],
*array[index].item()[2:8],
"<b>V</b>:{}<br/>"
"<b>wap</b>:{}".format(
*array[index - first][
['open', 'high', 'low', 'close', 'volume', 'bar_wap']
],
name=name,
index=index,
)
@ -149,8 +192,10 @@ class ContentsLabel(pg.LabelItem):
index: int,
array: np.ndarray,
) -> None:
data = array[index][name]
self.setText(f"{name}: {data:.2f}")
first = array[0]['index']
if index < array[-1]['index'] and index > first:
data = array[index - first][name]
self.setText(f"{name}: {data:.2f}")
class CrossHair(pg.GraphicsObject):
@ -246,7 +291,7 @@ class CrossHair(pg.GraphicsObject):
) -> LineDot:
# if this plot contains curves add line dot "cursors" to denote
# the current sample under the mouse
cursor = LineDot(curve, index=len(plot._array))
cursor = LineDot(curve, index=plot._ohlc[-1]['index'], plot=plot)
plot.addItem(cursor)
self.graphics[plot].setdefault('cursors', []).append(cursor)
return cursor
@ -308,8 +353,9 @@ class CrossHair(pg.GraphicsObject):
plot.update_contents_labels(ix)
# update all subscribed curve dots
# first = plot._ohlc[0]['index']
for cursor in opts.get('cursors', ()):
cursor.setIndex(ix)
cursor.setIndex(ix) # - first)
# update the label on the bottom of the crosshair
self.xaxis_label.update_label(
@ -332,96 +378,127 @@ class CrossHair(pg.GraphicsObject):
return self.plots[0].boundingRect()
# @jit(
# # float64[:](
# # float64[:],
# # optional(float64),
# # optional(int16)
# # ),
# nopython=True,
# nogil=True
# )
def _mk_lines_array(data: List, size: int) -> np.ndarray:
"""Create an ndarray to hold lines graphics objects.
def _mk_lines_array(
data: List,
size: int,
elements_step: int = 6,
) -> np.ndarray:
"""Create an ndarray to hold lines graphics info.
"""
return np.zeros_like(
data,
shape=(int(size), 3),
shape=(int(size), elements_step),
dtype=object,
)
# TODO: `numba` this?
def lines_from_ohlc(row: np.ndarray, w: float) -> Tuple[QLineF]:
open, high, low, close, index = row[
['open', 'high', 'low', 'close', 'index']]
# @jit(
# # float64[:](
# # float64[:],
# # optional(float64),
# # optional(int16)
# # ),
# nopython=True,
# nogil=True
# )
def bars_from_ohlc(
# high -> low vertical (body) line
if low != high:
hl = QLineF(index, low, index, high)
else:
# XXX: if we don't do it renders a weird rectangle?
# see below for filtering this later...
hl = None
# NOTE: place the x-coord start as "middle" of the drawing range such
# that the open arm line-graphic is at the left-most-side of
# the index's range according to the view mapping.
# open line
o = QLineF(index - w, open, index, open)
# close line
c = QLineF(index, close, index + w, close)
return [hl, o, c]
@jit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.Tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nopython=True,
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
w: float,
start: int = 0,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
"""Generate an array of lines objects from input ohlc data.
"""
lines = _mk_lines_array(data, data.shape[0])
size = int(data.shape[0] * 6)
for i, q in enumerate(data[start:], start=start):
open, high, low, close, index = q[
['open', 'high', 'low', 'close', 'index']]
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# high -> low vertical (body) line
if low != high:
hl = QLineF(index, low, index, high)
else:
# XXX: if we don't do it renders a weird rectangle?
# see below for filtering this later...
hl = None
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
# NOTE: place the x-coord start as "middle" of the drawing range such
# that the open arm line-graphic is at the left-most-side of
# the index's range according to the view mapping.
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
# open line
o = QLineF(index - w, open, index, open)
# close line
c = QLineF(index, close, index + w, close)
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
# indexing here is as per the below comments
lines[i] = (hl, o, c)
istart = i * 6
istop = istart + 6
# XXX: in theory we could get a further speedup by using a flat
# array and avoiding the call to `np.ravel()` below?
# lines[3*i:3*i+3] = (hl, o, c)
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# XXX: legacy code from candles custom graphics:
# if not _tina_mode:
# else _tina_mode:
# self.lines = lines = np.concatenate(
# [high_to_low, open_sticks, close_sticks])
# use traditional up/down green/red coloring
# long_bars = np.resize(Quotes.close > Quotes.open, len(lines))
# short_bars = np.resize(
# Quotes.close < Quotes.open, len(lines))
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (0, 1, 1, 1, 1, 1)
# ups = lines[long_bars]
# downs = lines[short_bars]
return x, y, c
# # draw "up" bars
# p.setPen(self.bull_brush)
# p.drawLines(*ups)
# # draw "down" bars
# p.setPen(self.bear_brush)
# p.drawLines(*downs)
# @timeit
def gen_qpath(
data,
start, # XXX: do we need this?
w,
) -> QtGui.QPainterPath:
return lines
x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
# TODO: numba the internals of this!
return pg.functions.arrayToQPath(x, y, connect=c)
class BarItems(pg.GraphicsObject):
@ -431,11 +508,10 @@ class BarItems(pg.GraphicsObject):
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43
bars_pen = pg.mkPen(hcolor('bracket'))
# XXX: tina mode, see below
# bull_brush = pg.mkPen('#00cc00')
# bear_brush = pg.mkPen('#fa0000')
# XXX: for the mega-lulz increasing width here increases draw latency...
# so probably don't do it until we figure that out.
bars_pen = pg.mkPen(hcolor('bracket'))
def __init__(
self,
@ -443,95 +519,87 @@ class BarItems(pg.GraphicsObject):
plotitem: 'pg.PlotItem', # noqa
) -> None:
super().__init__()
self.last = QtGui.QPicture()
self.history = QtGui.QPicture()
# TODO: implement updateable pixmap solution
self.last_bar = QtGui.QPicture()
self.path = QtGui.QPainterPath()
# self._h_path = QtGui.QGraphicsPathItem(self.path)
self._pi = plotitem
# self._scene = plotitem.vb.scene()
# self.picture = QtGui.QPixmap(1000, 300)
# plotitem.addItem(self.picture)
# self._pmi = None
# self._pmi = self._scene.addPixmap(self.picture)
self._xrange: Tuple[int, int]
self._yrange: Tuple[float, float]
# XXX: not sure this actually needs to be an array other
# then for the old tina mode calcs for up/down bars below?
# lines container
self.lines = _mk_lines_array([], 50e3)
# self.lines = _mk_lines_array([], 50e3, 6)
# TODO: don't render the full backing array each time
# self._path_data = None
self._last_bar_lines: Optional[Tuple[QLineF, ...]] = None
# track the current length of drawable lines within the larger array
self.index: int = 0
self.start_index: int = 0
self.stop_index: int = 0
# @timeit
def draw_from_data(
self,
data: np.ndarray,
start: int = 0,
):
) -> QtGui.QPainterPath:
"""Draw OHLC datum graphics from a ``np.ndarray``.
This routine is usually only called to draw the initial history.
"""
lines = bars_from_ohlc(data, self.w, start=start)
self.path = gen_qpath(data, start, self.w)
# save graphics for later reference and keep track
# of current internal "last index"
index = len(lines)
self.lines[:index] = lines
self.index = index
# self.start_index = len(data)
index = data['index']
self._xrange = (index[0], index[-1])
self._yrange = (
np.nanmax(data['high']),
np.nanmin(data['low']),
)
# up to last to avoid double draw of last bar
self.draw_lines(just_history=True, iend=self.index - 1)
self.draw_lines(iend=self.index)
self._last_bar_lines = lines_from_ohlc(data[-1], self.w)
# @timeit
def draw_lines(
self,
istart=0,
iend=None,
just_history=False,
# TODO: could get even fancier and only update the single close line?
lines=None,
) -> None:
"""Draw the current line set using the painter.
"""
if just_history:
# draw bars for the "history" picture
iend = iend or self.index - 1
pic = self.history
else:
# draw the last bar
istart = self.index - 1
iend = iend or self.index
pic = self.last
# create pics
# self.draw_history()
self.draw_last_bar()
# use 2d array of lines objects, see conlusion on speed:
# https://stackoverflow.com/a/60089929
flat = np.ravel(self.lines[istart:iend])
# TODO: do this with numba for speed gain:
# https://stackoverflow.com/questions/58422690/filtering-a-numpy-array-what-is-the-best-approach
to_draw = flat[np.where(flat != None)] # noqa
# pre-computing a QPicture object allows paint() to run much
# more quickly, rather than re-drawing the shapes every time.
p = QtGui.QPainter(pic)
p.setPen(self.bars_pen)
# TODO: is there any way to not have to pass all the lines every
# iteration? It seems they won't draw unless it's done this way..
p.drawLines(*to_draw)
p.end()
# XXX: if we ever try using `QPixmap` again...
# if self._pmi is None:
# self._pmi = self.scene().addPixmap(self.picture)
# else:
# self._pmi.setPixmap(self.picture)
# trigger re-render
# trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update()
return self.path
# def update_ranges(
# self,
# xmn: int,
# xmx: int,
# ymn: float,
# ymx: float,
# ) -> None:
# ...
def draw_last_bar(self) -> None:
"""Currently this draws lines to a cached ``QPicture`` which
is supposed to speed things up on ``.paint()`` calls (which
is a call to ``QPainter.drawPicture()`` but I'm not so sure.
"""
p = QtGui.QPainter(self.last_bar)
p.setPen(self.bars_pen)
p.drawLines(*tuple(filter(bool, self._last_bar_lines)))
p.end()
# @timeit
def update_from_array(
self,
array: np.ndarray,
@ -545,32 +613,65 @@ class BarItems(pg.GraphicsObject):
graphics object, and then update/rerender, but here we're
assuming the prior graphics havent changed (OHLC history rarely
does) so this "should" be simpler and faster.
This routine should be made (transitively) as fast as possible.
"""
index = self.index
length = len(array)
extra = length - index
# index = self.start_index
istart, istop = self._xrange
# start_bar_to_update = index - 100
index = array['index']
first_index, last_index = index[0], index[-1]
# length = len(array)
prepend_length = istart - first_index
append_length = last_index - istop
# TODO: allow mapping only a range of lines thus
# only drawing as many bars as exactly specified.
if prepend_length:
# new history was added and we need to render a new path
new_bars = array[:prepend_length]
prepend_path = gen_qpath(new_bars, 0, self.w)
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from
# array[prepend_length + 1] ???
# update path
old_path = self.path
self.path = prepend_path
self.path.addPath(old_path)
if append_length:
# generate new lines objects for updatable "current bar"
self._last_bar_lines = lines_from_ohlc(array[-1], self.w)
self.draw_last_bar()
if extra > 0:
# generate new graphics to match provided array
new = array[index:index + extra]
lines = bars_from_ohlc(new, self.w)
bars_added = len(lines)
self.lines[index:index + bars_added] = lines
self.index += bars_added
# path appending logic:
# we need to get the previous "current bar(s)" for the time step
# and convert it to a sub-path to append to the historical set
# new_bars = array[istop - 1:istop + append_length - 1]
new_bars = array[-append_length - 1:-1]
append_path = gen_qpath(new_bars, 0, self.w)
self.path.moveTo(float(istop - self.w), float(new_bars[0]['open']))
self.path.addPath(append_path)
# start_bar_to_update = index - bars_added
self.draw_lines(just_history=True)
if just_history:
return
self._xrange = first_index, last_index
# current bar update
if just_history:
self.update()
return
# last bar update
i, o, h, l, last, v = array[-1][
['index', 'open', 'high', 'low', 'close', 'volume']
]
assert i == self.index - 1
body, larm, rarm = self.lines[i]
# assert i == self.start_index - 1
assert i == last_index
body, larm, rarm = self._last_bar_lines
# XXX: is there a faster way to modify this?
rarm.setLine(rarm.x1(), last, rarm.x2(), last)
@ -579,16 +680,26 @@ class BarItems(pg.GraphicsObject):
if l != h: # noqa
if body is None:
body = self.lines[index - 1][0] = QLineF(i, l, i, h)
body = self._last_bar_lines[0] = QLineF(i, l, i, h)
else:
# update body
body.setLine(i, l, i, h)
else:
# XXX: h == l -> remove any HL line to avoid render bug
if body is not None:
body = self.lines[index - 1][0] = None
self.draw_lines(just_history=False)
# XXX: pretty sure this is causing an issue where the bar has
# a large upward move right before the next sample and the body
# is getting set to None since the next bar is flat but the shm
# array index update wasn't read by the time this code runs. Iow
# we're doing this removal of the body for a bar index that is
# now out of date / from some previous sample. It's weird
# though because i've seen it do this to bars i - 3 back?
# else:
# # XXX: h == l -> remove any HL line to avoid render bug
# if body is not None:
# body = self.lines[index - 1][0] = None
self.draw_last_bar()
self.update()
# @timeit
def paint(self, p, opt, widget):
@ -606,33 +717,36 @@ class BarItems(pg.GraphicsObject):
# as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars
# in view.
p.drawPicture(0, 0, self.history)
p.drawPicture(0, 0, self.last)
p.drawPicture(0, 0, self.last_bar)
# TODO: if we can ever make pixmaps work...
# p.drawPixmap(0, 0, self.picture)
# self._pmi.setPixmap(self.picture)
# print(self.scene())
# profiler('bars redraw:')
p.setPen(self.bars_pen)
p.drawPath(self.path)
# @timeit
def boundingRect(self):
# TODO: can we do rect caching to make this faster?
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
# TODO: Can we do rect caching to make this faster
# like `pg.PlotCurveItem` does? In theory it's just
# computing max/min stuff again like we do in the udpate loop
# anyway. Not really sure it's necessary since profiling already
# shows this method is faf.
# boundingRect _must_ indicate the entire area that will be
# drawn on or else we will get artifacts and possibly crashing.
# (in this case, QPicture does all the work of computing the
# bounding rect for us).
# compute aggregate bounding rectangle
lb = self.last.boundingRect()
hb = self.history.boundingRect()
lb = self.last_bar.boundingRect()
hb = self.path.boundingRect()
return QtCore.QRectF(
# top left
QtCore.QPointF(hb.topLeft()),
# total size
QtCore.QSizeF(lb.size() + hb.size())
QtCore.QSizeF(QtCore.QSizeF(lb.size()) + hb.size())
# QtCore.QSizeF(lb.size() + hb.size())
)
@ -785,7 +899,7 @@ class L1Labels:
chart: 'ChartPlotWidget', # noqa
digits: int = 2,
size_digits: int = 0,
font_size_inches: float = 4 / 53.,
font_size_inches: float = _down_2_font_inches_we_like,
) -> None:
self.chart = chart
@ -839,7 +953,9 @@ def level_line(
digits: int = 1,
# size 4 font on 4k screen scaled down, so small-ish.
font_size_inches: float = 4 / 53.,
font_size_inches: float = _down_2_font_inches_we_like,
show_label: bool = True,
**linelabelkwargs
) -> LevelLine:
@ -859,6 +975,7 @@ def level_line(
**linelabelkwargs
)
label.update_from_data(0, level)
# TODO: can we somehow figure out a max value from the parent axis?
label._size_br_from_str(label.label_str)
@ -874,4 +991,7 @@ def level_line(
chart.plotItem.addItem(line)
if not show_label:
label.hide()
return line

View File

@ -18,6 +18,7 @@
Qt UI styling.
"""
from typing import Optional
import math
import pyqtgraph as pg
from PyQt5 import QtCore, QtGui
@ -27,10 +28,9 @@ from ..log import get_logger
log = get_logger(__name__)
# chart-wide font
# font size 6px / 53 dpi (3x scaled down on 4k hidpi)
_default_font_inches_we_like = 6 / 53 # px / (px / inch) = inch
_down_2_font_inches_we_like = 4 / 53
# chart-wide fonts specified in inches
_default_font_inches_we_like = 6 / 96
_down_2_font_inches_we_like = 5 / 96
class DpiAwareFont:
@ -66,8 +66,12 @@ class DpiAwareFont:
listed in the script in ``snippets/qt_screen_info.py``.
"""
dpi = screen.physicalDotsPerInch()
font_size = round(self._iwl * dpi)
# take the max since scaling can make things ugly in some cases
pdpi = screen.physicalDotsPerInch()
ldpi = screen.logicalDotsPerInch()
dpi = max(pdpi, ldpi)
font_size = math.floor(self._iwl * dpi)
log.info(
f"\nscreen:{screen.name()} with DPI: {dpi}"
f"\nbest font size is {font_size}\n"