Merge pull request #268 from pikers/trade_ratez

Trade ratez
pp_bar_fixes
goodboy 2022-02-10 11:43:56 -05:00 committed by GitHub
commit 14faf2d245
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 848 additions and 235 deletions

View File

@ -1042,6 +1042,7 @@ tick_types = {
# https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume
48: 'dark_trade', 48: 'dark_trade',
# standard L1 ticks
0: 'bsize', 0: 'bsize',
1: 'bid', 1: 'bid',
2: 'ask', 2: 'ask',
@ -1049,6 +1050,12 @@ tick_types = {
4: 'last', 4: 'last',
5: 'size', 5: 'size',
8: 'volume', 8: 'volume',
# ``ib_insync`` already packs these into
# quotes under the following fields.
# 55: 'trades_per_min', # `'tradeRate'`
# 56: 'vlm_per_min', # `'volumeRate'`
# 89: 'shortable', # `'shortableShares'`
} }
@ -1069,6 +1076,10 @@ def normalize(
new_ticks.append(td) new_ticks.append(td)
tbt = ticker.tickByTicks
if tbt:
print(f'tickbyticks:\n {ticker.tickByTicks}')
ticker.ticks = new_ticks ticker.ticks = new_ticks
# some contracts don't have volume so we may want to calculate # some contracts don't have volume so we may want to calculate
@ -1081,6 +1092,11 @@ def normalize(
# serialize for transport # serialize for transport
data = asdict(ticker) data = asdict(ticker)
# convert named tuples to dicts for transport
tbts = data.get('tickByTicks')
if tbts:
data['tickByTicks'] = [tbt._asdict() for tbt in tbts]
# add time stamps for downstream latency measurements # add time stamps for downstream latency measurements
data['brokerd_ts'] = time.time() data['brokerd_ts'] = time.time()
@ -1263,7 +1279,18 @@ async def _setup_quote_stream(
to_trio: trio.abc.SendChannel, to_trio: trio.abc.SendChannel,
symbol: str, symbol: str,
opts: tuple[int] = ('375', '233', '236'), opts: tuple[int] = (
'375', # RT trade volume (excludes utrades)
'233', # RT trade volume (includes utrades)
'236', # Shortable shares
# these all appear to only be updated every 25s thus
# making them mostly useless and explains why the scanner
# is always slow XD
# '293', # Trade count for day
'294', # Trade rate / minute
'295', # Vlm rate / minute
),
contract: Optional[Contract] = None, contract: Optional[Contract] = None,
) -> trio.abc.ReceiveChannel: ) -> trio.abc.ReceiveChannel:
@ -1281,6 +1308,12 @@ async def _setup_quote_stream(
contract = contract or (await client.find_contract(symbol)) contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
# NOTE: it's batch-wise and slow af but I guess could
# be good for backchecking? Seems to be every 5s maybe?
# ticker: Ticker = client.ib.reqTickByTickData(
# contract, 'Last',
# )
# # define a simple queue push routine that streams quote packets # # define a simple queue push routine that streams quote packets
# # to trio over the ``to_trio`` memory channel. # # to trio over the ``to_trio`` memory channel.
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore

View File

@ -14,27 +14,67 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
""" '''
Stream format enforcement. Stream format enforcement.
"""
from typing import AsyncIterator, Optional, Tuple '''
from itertools import chain
import numpy as np from typing import AsyncIterator
def iterticks( def iterticks(
quote: dict, quote: dict,
types: Tuple[str] = ('trade', 'dark_trade'), types: tuple[str] = (
'trade',
'dark_trade',
),
deduplicate_darks: bool = False,
) -> AsyncIterator: ) -> AsyncIterator:
''' '''
Iterate through ticks delivered per quote cycle. Iterate through ticks delivered per quote cycle.
''' '''
if deduplicate_darks:
assert 'dark_trade' in types
# print(f"{quote}\n\n") # print(f"{quote}\n\n")
ticks = quote.get('ticks', ()) ticks = quote.get('ticks', ())
trades = {}
darks = {}
if ticks: if ticks:
# do a first pass and attempt to remove duplicate dark
# trades with the same tick signature.
if deduplicate_darks:
for tick in ticks:
ttype = tick.get('type')
time = tick.get('time', None)
if time:
sig = (
time,
tick['price'],
tick['size']
)
if ttype == 'dark_trade':
darks[sig] = tick
elif ttype == 'trade':
trades[sig] = tick
# filter duplicates
for sig, tick in trades.items():
tick = darks.pop(sig, None)
if tick:
ticks.remove(tick)
# print(f'DUPLICATE {tick}')
# re-insert ticks
ticks.extend(list(chain(trades.values(), darks.values())))
for tick in ticks: for tick in ticks:
# print(f"{quote['symbol']}: {tick}") # print(f"{quote['symbol']}: {tick}")
ttype = tick.get('type') ttype = tick.get('type')

View File

@ -252,7 +252,7 @@ async def sample_and_broadcast(
try: try:
stream.send_nowait((sym, quote)) stream.send_nowait((sym, quote))
except trio.WouldBlock: except trio.WouldBlock:
ctx = getattr(sream, '_ctx', None) ctx = getattr(stream, '_ctx', None)
if ctx: if ctx:
log.warning( log.warning(
f'Feed overrun {bus.brokername} ->' f'Feed overrun {bus.brokername} ->'
@ -371,7 +371,7 @@ async def uniform_rate_send(
# we have a quote already so send it now. # we have a quote already so send it now.
measured_rate = 1 / (time.time() - last_send) # measured_rate = 1 / (time.time() - last_send)
# log.info( # log.info(
# f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}'
# ) # )

View File

@ -18,9 +18,10 @@
NumPy compatible shared memory buffers for real-time IPC streaming. NumPy compatible shared memory buffers for real-time IPC streaming.
""" """
from __future__ import annotations
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from sys import byteorder from sys import byteorder
from typing import List, Tuple, Optional from typing import Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
from multiprocessing import resource_tracker as mantracker from multiprocessing import resource_tracker as mantracker
@ -29,6 +30,7 @@ if _USE_POSIX:
import tractor import tractor
import numpy as np import numpy as np
from pydantic import BaseModel, validator
from ..log import get_logger from ..log import get_logger
from ._source import base_iohlc_dtype from ._source import base_iohlc_dtype
@ -85,26 +87,34 @@ class SharedInt:
shm_unlink(self._shm.name) shm_unlink(self._shm.name)
@dataclass class _Token(BaseModel):
class _Token: '''
"""Internal represenation of a shared memory "token" Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry. which can be used to key a system wide post shm entry.
"""
'''
class Config:
frozen = True
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
shm_first_index_name: str shm_first_index_name: str
shm_last_index_name: str shm_last_index_name: str
dtype_descr: List[Tuple[str]] dtype_descr: tuple
def __post_init__(self): @property
# np.array requires a list for dtype def dtype(self) -> np.dtype:
self.dtype_descr = np.dtype(list(map(tuple, self.dtype_descr))).descr return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self): def as_msg(self):
return asdict(self) return self.dict()
@classmethod @classmethod
def from_msg(self, msg: dict) -> '_Token': def from_msg(cls, msg: dict) -> _Token:
return msg if isinstance(msg, _Token) else _Token(**msg) if isinstance(msg, _Token):
return msg
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)
# TODO: this api? # TODO: this api?
@ -127,15 +137,17 @@ def _make_token(
key: str, key: str,
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
) -> _Token: ) -> _Token:
"""Create a serializable token that can be used '''
Create a serializable token that can be used
to access a shared array. to access a shared array.
"""
'''
dtype = base_iohlc_dtype if dtype is None else dtype dtype = base_iohlc_dtype if dtype is None else dtype
return _Token( return _Token(
key, shm_name=key,
key + "_first", shm_first_index_name=key + "_first",
key + "_last", shm_last_index_name=key + "_last",
np.dtype(dtype).descr dtype_descr=np.dtype(dtype).descr
) )
@ -178,10 +190,10 @@ class ShmArray:
@property @property
def _token(self) -> _Token: def _token(self) -> _Token:
return _Token( return _Token(
self._shm.name, shm_name=self._shm.name,
self._first._shm.name, shm_first_index_name=self._first._shm.name,
self._last._shm.name, shm_last_index_name=self._last._shm.name,
self._array.dtype.descr, dtype_descr=tuple(self._array.dtype.descr),
) )
@property @property
@ -402,16 +414,19 @@ def open_shm_array(
def attach_shm_array( def attach_shm_array(
token: Tuple[str, str, Tuple[str, str]], token: tuple[str, str, tuple[str, str]],
size: int = _default_size, size: int = _default_size,
readonly: bool = True, readonly: bool = True,
) -> ShmArray: ) -> ShmArray:
"""Attach to an existing shared memory array previously '''
Attach to an existing shared memory array previously
created by another process using ``open_shared_array``. created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write No new shared mem is allocated but wrapper types for read/write
access are constructed. access are constructed.
"""
'''
token = _Token.from_msg(token) token = _Token.from_msg(token)
key = token.shm_name key = token.shm_name
@ -422,7 +437,7 @@ def attach_shm_array(
shm = SharedMemory(name=key) shm = SharedMemory(name=key)
shmarr = np.ndarray( shmarr = np.ndarray(
(size,), (size,),
dtype=token.dtype_descr, dtype=token.dtype,
buffer=shm.buf buffer=shm.buf
) )
shmarr.setflags(write=int(not readonly)) shmarr.setflags(write=int(not readonly))
@ -470,8 +485,10 @@ def maybe_open_shm_array(
key: str, key: str,
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
**kwargs, **kwargs,
) -> Tuple[ShmArray, bool]:
"""Attempt to attach to a shared memory block using a "key" lookup ) -> tuple[ShmArray, bool]:
'''
Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registry to registered blocks in the users overall "system" registry
(presumes you don't have the block's explicit token). (presumes you don't have the block's explicit token).
@ -485,7 +502,8 @@ def maybe_open_shm_array(
If you know the explicit ``_Token`` for your memory segment instead If you know the explicit ``_Token`` for your memory segment instead
use ``attach_shm_array``. use ``attach_shm_array``.
"""
'''
try: try:
# see if we already know this key # see if we already know this key
token = _known_tokens[key] token = _known_tokens[key]

View File

@ -567,7 +567,7 @@ async def open_feed(
shm_token = data['shm_token'] shm_token = data['shm_token']
# XXX: msgspec won't relay through the tuples XD # XXX: msgspec won't relay through the tuples XD
shm_token['dtype_descr'] = list( shm_token['dtype_descr'] = tuple(
map(tuple, shm_token['dtype_descr'])) map(tuple, shm_token['dtype_descr']))
assert shm_token == shm.token # sanity assert shm_token == shm.token # sanity

View File

@ -40,6 +40,8 @@ from tractor.msg import NamespacePath
from ..data._sharedmem import ( from ..data._sharedmem import (
ShmArray, ShmArray,
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array,
_Token,
) )
from ..log import get_logger from ..log import get_logger
@ -72,6 +74,13 @@ class Fsp:
# - custom function wrappers, # - custom function wrappers,
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers
# actor-local map of source flow shm tokens
# + the consuming fsp *to* the consumers output
# shm flow.
_flow_registry: dict[
tuple[_Token, str], _Token,
] = {}
def __init__( def __init__(
self, self,
func: Callable[..., Awaitable], func: Callable[..., Awaitable],
@ -93,7 +102,7 @@ class Fsp:
self.config: dict[str, Any] = config self.config: dict[str, Any] = config
# register with declared set. # register with declared set.
_fsp_registry[self.ns_path] = func _fsp_registry[self.ns_path] = self
@property @property
def name(self) -> str: def name(self) -> str:
@ -111,6 +120,24 @@ class Fsp:
): ):
return self.func(*args, **kwargs) return self.func(*args, **kwargs)
# TODO: lru_cache this? prettty sure it'll work?
def get_shm(
self,
src_shm: ShmArray,
) -> ShmArray:
'''
Provide access to allocated shared mem array
for this "instance" of a signal processor for
the given ``key``.
'''
dst_token = self._flow_registry[
(src_shm._token, self.name)
]
shm = attach_shm_array(dst_token)
return shm
def fsp( def fsp(
wrapped=None, wrapped=None,
@ -132,18 +159,27 @@ def fsp(
return Fsp(wrapped, outputs=(wrapped.__name__,)) return Fsp(wrapped, outputs=(wrapped.__name__,))
def mk_fsp_shm_key(
sym: str,
target: Fsp
) -> str:
uid = tractor.current_actor().uid
return f'{sym}.fsp.{target.name}.{".".join(uid)}'
def maybe_mk_fsp_shm( def maybe_mk_fsp_shm(
sym: str, sym: str,
target: fsp, target: Fsp,
readonly: bool = True, readonly: bool = True,
) -> (ShmArray, bool): ) -> (str, ShmArray, bool):
''' '''
Allocate a single row shm array for an symbol-fsp pair if none Allocate a single row shm array for an symbol-fsp pair if none
exists, otherwise load the shm already existing for that token. exists, otherwise load the shm already existing for that token.
''' '''
uid = tractor.current_actor().uid assert isinstance(sym, str), '`sym` should be file-name-friendly `str`'
# TODO: load output types from `Fsp` # TODO: load output types from `Fsp`
# - should `index` be a required internal field? # - should `index` be a required internal field?
@ -152,7 +188,7 @@ def maybe_mk_fsp_shm(
[(field_name, float) for field_name in target.outputs] [(field_name, float) for field_name in target.outputs]
) )
key = f'{sym}.fsp.{target.name}.{".".join(uid)}' key = mk_fsp_shm_key(sym, target)
shm, opened = maybe_open_shm_array( shm, opened = maybe_open_shm_array(
key, key,
@ -160,4 +196,4 @@ def maybe_mk_fsp_shm(
dtype=fsp_dtype, dtype=fsp_dtype,
readonly=True, readonly=True,
) )
return shm, opened return key, shm, opened

View File

@ -20,7 +20,10 @@ core task logic for processing chains
''' '''
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial from functools import partial
from typing import AsyncIterator, Callable, Optional from typing import (
AsyncIterator, Callable, Optional,
Union,
)
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
@ -34,8 +37,11 @@ from .. import data
from ..data import attach_shm_array from ..data import attach_shm_array
from ..data.feed import Feed from ..data.feed import Feed
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ._api import Fsp from ._api import (
from ._api import _load_builtins Fsp,
_load_builtins,
_Token,
)
log = get_logger(__name__) log = get_logger(__name__)
@ -96,33 +102,74 @@ async def fsp_compute(
# to the async iterable? it's that or we do some kinda # to the async iterable? it's that or we do some kinda
# async itertools style? # async itertools style?
filter_quotes_by_sym(symbol, quote_stream), filter_quotes_by_sym(symbol, quote_stream),
# XXX: currently the ``ohlcv`` arg
feed.shm, feed.shm,
) )
# Conduct a single iteration of fsp with historical bars input # Conduct a single iteration of fsp with historical bars input
# and get historical output # and get historical output
history_output: Union[
dict[str, np.ndarray], # multi-output case
np.ndarray, # single output case
]
history_output = await out_stream.__anext__() history_output = await out_stream.__anext__()
func_name = func.__name__ func_name = func.__name__
profiler(f'{func_name} generated history') profiler(f'{func_name} generated history')
# build struct array with an 'index' field to push as history # build struct array with an 'index' field to push as history
history = np.zeros(
len(history_output),
dtype=dst.array.dtype
)
# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
# if the output array is multi-field then push # if the output array is multi-field then push
# each respective field. # each respective field.
fields = getattr(history.dtype, 'fields', None) # await tractor.breakpoint()
if fields: fields = getattr(dst.array.dtype, 'fields', None).copy()
fields.pop('index')
# TODO: nptyping here!
history: Optional[np.ndarray] = None
if fields and len(fields) > 1 and fields:
if not isinstance(history_output, dict):
raise ValueError(
f'`{func_name}` is a multi-output FSP and should yield a '
'`dict[str, np.ndarray]` for history'
)
for key in fields.keys(): for key in fields.keys():
if key in history.dtype.fields: if key in history_output:
history[func_name] = history_output output = history_output[key]
if history is None:
if output is None:
length = len(src.array)
else:
length = len(output)
# using the first output, determine
# the length of the struct-array that
# will be pushed to shm.
history = np.zeros(
length,
dtype=dst.array.dtype
)
if output is None:
continue
history[key] = output
# single-key output stream # single-key output stream
else: else:
if not isinstance(history_output, np.ndarray):
raise ValueError(
f'`{func_name}` is a single output FSP and should yield an '
'`np.ndarray` for history'
)
history = np.zeros(
len(history_output),
dtype=dst.array.dtype
)
history[func_name] = history_output history[func_name] = history_output
# TODO: XXX: # TODO: XXX:
@ -197,6 +244,8 @@ async def cascade(
ns_path: NamespacePath, ns_path: NamespacePath,
shm_registry: dict[str, _Token],
zero_on_step: bool = False, zero_on_step: bool = False,
loglevel: Optional[str] = None, loglevel: Optional[str] = None,
@ -219,9 +268,21 @@ async def cascade(
log.info( log.info(
f'Registered FSP set:\n{lines}' f'Registered FSP set:\n{lines}'
) )
func: Fsp = reg.get(
# update actor local flows table which registers
# readonly "instances" of this fsp for symbol/source
# so that consumer fsps can look it up by source + fsp.
# TODO: ugh i hate this wind/unwind to list over the wire
# but not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[
(_Token.from_msg(token), fsp_name)
] = _Token.from_msg(dst_token)
fsp: Fsp = reg.get(
NamespacePath(ns_path) NamespacePath(ns_path)
) )
func = fsp.func
if not func: if not func:
# TODO: assume it's a func target path # TODO: assume it's a func target path

View File

@ -170,6 +170,32 @@ def _wma(
return np.convolve(signal, weights, 'valid') return np.convolve(signal, weights, 'valid')
@fsp
async def wma(
source, #: AsyncStream[np.ndarray],
length: int,
ohlcv: np.ndarray, # price time-frame "aware"
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
'''
Streaming weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
'''
# deliver historical output as "first yield"
yield _wma(ohlcv.array['close'], length)
# begin real-time section
async for quote in source:
for tick in iterticks(quote, type='trade'):
yield _wma(ohlcv.last(length))
@fsp @fsp
async def rsi( async def rsi(
@ -224,29 +250,3 @@ async def rsi(
down_ema_last=last_down_ema_close, down_ema_last=last_down_ema_close,
) )
yield rsi_out[-1:] yield rsi_out[-1:]
@fsp
async def wma(
source, #: AsyncStream[np.ndarray],
length: int,
ohlcv: np.ndarray, # price time-frame "aware"
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
'''
Streaming weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
'''
# deliver historical output as "first yield"
yield _wma(ohlcv.array['close'], length)
# begin real-time section
async for quote in source:
for tick in iterticks(quote, type='trade'):
yield _wma(ohlcv.last(length))

View File

@ -22,17 +22,25 @@ from tractor.trionics._broadcast import AsyncReceiver
from ._api import fsp from ._api import fsp
from ..data._normalize import iterticks from ..data._normalize import iterticks
from ..data._sharedmem import ShmArray from ..data._sharedmem import ShmArray
from ._momo import _wma
from ..log import get_logger
log = get_logger(__name__)
# NOTE: is the same as our `wma` fsp, and if so which one is faster?
# Ohhh, this is an IIR style i think? So it has an anchor point
# effectively instead of a moving window/FIR style?
def wap( def wap(
signal: np.ndarray, signal: np.ndarray,
weights: np.ndarray, weights: np.ndarray,
) -> np.ndarray: ) -> np.ndarray:
"""Weighted average price from signal and weights. '''
Weighted average price from signal and weights.
""" '''
cum_weights = np.cumsum(weights) cum_weights = np.cumsum(weights)
cum_weighted_input = np.cumsum(signal * weights) cum_weighted_input = np.cumsum(signal * weights)
@ -89,7 +97,10 @@ async def tina_vwap(
# vwap_tot = h_vwap[-1] # vwap_tot = h_vwap[-1]
async for quote in source: async for quote in source:
for tick in iterticks(quote, types=['trade']): for tick in iterticks(
quote,
types=['trade'],
):
# c, h, l, v = ohlcv.array[-1][ # c, h, l, v = ohlcv.array[-1][
# ['closes', 'high', 'low', 'volume'] # ['closes', 'high', 'low', 'volume']
@ -107,8 +118,12 @@ async def tina_vwap(
@fsp( @fsp(
outputs=('dolla_vlm', 'dark_vlm'), outputs=(
ohlc=False, 'dolla_vlm',
'dark_vlm',
'trade_count',
'dark_trade_count',
),
curve_style='step', curve_style='step',
) )
async def dolla_vlm( async def dolla_vlm(
@ -132,14 +147,24 @@ async def dolla_vlm(
v = a['volume'] v = a['volume']
# on first iteration yield history # on first iteration yield history
yield chl3 * v yield {
'dolla_vlm': chl3 * v,
'dark_vlm': None,
}
i = ohlcv.index i = ohlcv.index
output = vlm = 0 dvlm = vlm = 0
dvlm = 0 dark_trade_count = trade_count = 0
async for quote in source: async for quote in source:
for tick in iterticks(quote): for tick in iterticks(
quote,
types=(
'trade',
'dark_trade',
),
deduplicate_darks=True,
):
# this computes tick-by-tick weightings from here forward # this computes tick-by-tick weightings from here forward
size = tick['size'] size = tick['size']
@ -148,24 +173,30 @@ async def dolla_vlm(
li = ohlcv.index li = ohlcv.index
if li > i: if li > i:
i = li i = li
vlm = 0 trade_count = dark_trade_count = dvlm = vlm = 0
dvlm = 0
# TODO: for marginned instruments (futes, etfs?) we need to # TODO: for marginned instruments (futes, etfs?) we need to
# show the margin $vlm by multiplying by whatever multiplier # show the margin $vlm by multiplying by whatever multiplier
# is reported in the sym info. # is reported in the sym info.
ttype = tick.get('type') ttype = tick.get('type')
if ttype == 'dark_trade': if ttype == 'dark_trade':
print(f'dark_trade: {tick}')
key = 'dark_vlm'
dvlm += price * size dvlm += price * size
output = dvlm yield 'dark_vlm', dvlm
dark_trade_count += 1
yield 'dark_trade_count', dark_trade_count
# print(f'{dark_trade_count}th dark_trade: {tick}')
else: else:
key = 'dolla_vlm' # print(f'vlm: {tick}')
vlm += price * size vlm += price * size
output = vlm yield 'dolla_vlm', vlm
trade_count += 1
yield 'trade_count', trade_count
# TODO: plot both to compare? # TODO: plot both to compare?
# c, h, l, v = ohlcv.last()[ # c, h, l, v = ohlcv.last()[
@ -174,4 +205,154 @@ async def dolla_vlm(
# tina_lvlm = c+h+l/3 * v # tina_lvlm = c+h+l/3 * v
# print(f' tinal vlm: {tina_lvlm}') # print(f' tinal vlm: {tina_lvlm}')
yield key, output
@fsp(
# TODO: eventually I guess we should support some kinda declarative
# graphics config syntax per output yah? That seems like a clean way
# to let users configure things? Not sure how exactly to offer that
# api as well as how to expose such a thing *inside* the body?
outputs=(
# pulled verbatim from `ib` for now
'1m_trade_rate',
'1m_vlm_rate',
# our own instantaneous rate calcs which are all
# parameterized by a samples count (bars) period
'trade_rate',
'dark_trade_rate',
'dvlm_rate',
'dark_dvlm_rate',
),
curve_style='line',
)
async def flow_rates(
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
# TODO (idea): a dynamic generic / boxing type that can be updated by other
# FSPs, user input, and possibly any general event stream in
# real-time. Hint: ideally implemented with caching until mutated
# ;)
period: 'Param[int]' = 6, # noqa
# TODO: support other means by providing a map
# to weights `partial()`-ed with `wma()`?
mean_type: str = 'arithmetic',
# TODO (idea): a generic for declaring boxed fsps much like ``pytest``
# fixtures? This probably needs a lot of thought if we want to offer
# a higher level composition syntax eventually (oh right gotta make
# an issue for that).
# ideas for how to allow composition / intercalling:
# - offer a `Fsp.get_history()` to do the first yield output?
# * err wait can we just have shm access directly?
# - how would it work if some consumer fsp wanted to dynamically
# change params which are input to the callee fsp? i guess we could
# lazy copy in that case?
# dvlm: 'Fsp[dolla_vlm]'
) -> AsyncIterator[
tuple[str, Union[np.ndarray, float]],
]:
# generally no history available prior to real-time calcs
yield {
# from ib
'1m_trade_rate': None,
'1m_vlm_rate': None,
'trade_rate': None,
'dark_trade_rate': None,
'dvlm_rate': None,
'dark_dvlm_rate': None,
}
# TODO: 3.10 do ``anext()``
quote = await source.__anext__()
# ltr = 0
# lvr = 0
tr = quote.get('tradeRate')
yield '1m_trade_rate', tr or 0
vr = quote.get('volumeRate')
yield '1m_vlm_rate', vr or 0
yield 'trade_rate', 0
yield 'dark_trade_rate', 0
yield 'dvlm_rate', 0
yield 'dark_dvlm_rate', 0
# NOTE: in theory we could dynamically allocate a cascade based on
# this call but not sure if that's too "dynamic" in terms of
# validating cascade flows from message typing perspective.
# attach to ``dolla_vlm`` fsp running
# on this same source flow.
dvlm_shm = dolla_vlm.get_shm(ohlcv)
# precompute arithmetic mean weights (all ones)
seq = np.full((period,), 1)
weights = seq / seq.sum()
async for quote in source:
if not quote:
log.error("OH WTF NO QUOTE IN FSP")
continue
# dvlm_wma = _wma(
# dvlm_shm.array['dolla_vlm'],
# period,
# weights=weights,
# )
# yield 'dvlm_rate', dvlm_wma[-1]
if period > 1:
trade_rate_wma = _wma(
dvlm_shm.array['trade_count'],
period,
weights=weights,
)
trade_rate = trade_rate_wma[-1]
# print(trade_rate)
yield 'trade_rate', trade_rate
else:
# instantaneous rate per sample step
count = dvlm_shm.array['trade_count'][-1]
yield 'trade_rate', count
# TODO: skip this if no dark vlm is declared
# by symbol info (eg. in crypto$)
# dark_dvlm_wma = _wma(
# dvlm_shm.array['dark_vlm'],
# period,
# weights=weights,
# )
# yield 'dark_dvlm_rate', dark_dvlm_wma[-1]
if period > 1:
dark_trade_rate_wma = _wma(
dvlm_shm.array['dark_trade_count'],
period,
weights=weights,
)
yield 'dark_trade_rate', dark_trade_rate_wma[-1]
else:
# instantaneous rate per sample step
dark_count = dvlm_shm.array['dark_trade_count'][-1]
yield 'dark_trade_rate', dark_count
# XXX: ib specific schema we should
# probably pre-pack ourselves.
# tr = quote.get('tradeRate')
# if tr is not None and tr != ltr:
# # print(f'trade rate: {tr}')
# yield '1m_trade_rate', tr
# ltr = tr
# vr = quote.get('volumeRate')
# if vr is not None and vr != lvr:
# # print(f'vlm rate: {vr}')
# yield '1m_vlm_rate', vr
# lvr = vr

View File

@ -44,10 +44,14 @@ class Axis(pg.AxisItem):
self, self,
linkedsplits, linkedsplits,
typical_max_str: str = '100 000.000', typical_max_str: str = '100 000.000',
text_color: str = 'bracket',
**kwargs **kwargs
) -> None: ) -> None:
super().__init__(**kwargs) super().__init__(
# textPen=textPen,
**kwargs
)
# XXX: pretty sure this makes things slower # XXX: pretty sure this makes things slower
# self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) # self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
@ -74,15 +78,28 @@ class Axis(pg.AxisItem):
}) })
self.setTickFont(_font.font) self.setTickFont(_font.font)
# NOTE: this is for surrounding "border" # NOTE: this is for surrounding "border"
self.setPen(_axis_pen) self.setPen(_axis_pen)
# this is the text color # this is the text color
self.setTextPen(_axis_pen) # self.setTextPen(pg.mkPen(hcolor(text_color)))
self.text_color = text_color
self.typical_br = _font._qfm.boundingRect(typical_max_str) self.typical_br = _font._qfm.boundingRect(typical_max_str)
# size the pertinent axis dimension to a "typical value" # size the pertinent axis dimension to a "typical value"
self.size_to_values() self.size_to_values()
@property
def text_color(self) -> str:
return self._text_color
@text_color.setter
def text_color(self, text_color: str) -> None:
self.setTextPen(pg.mkPen(hcolor(text_color)))
self._text_color = text_color
def size_to_values(self) -> None: def size_to_values(self) -> None:
pass pass
@ -109,7 +126,8 @@ class PriceAxis(Axis):
def set_title( def set_title(
self, self,
title: str, title: str,
view: Optional[ChartView] = None view: Optional[ChartView] = None,
color: Optional[str] = None,
) -> Label: ) -> Label:
''' '''
@ -123,7 +141,7 @@ class PriceAxis(Axis):
label = self.title = Label( label = self.title = Label(
view=view or self.linkedView(), view=view or self.linkedView(),
fmt_str=title, fmt_str=title,
color='bracket', color=color or self.text_color,
parent=self, parent=self,
# update_on_range_change=False, # update_on_range_change=False,
) )

View File

@ -33,6 +33,7 @@ from PyQt5.QtWidgets import (
import numpy as np import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
import trio import trio
from pydantic import BaseModel
from ._axes import ( from ._axes import (
DynamicDateAxis, DynamicDateAxis,
@ -614,17 +615,30 @@ class LinkedSplits(QWidget):
cpw.sidepane.setMinimumWidth(sp_w) cpw.sidepane.setMinimumWidth(sp_w)
cpw.sidepane.setMaximumWidth(sp_w) cpw.sidepane.setMaximumWidth(sp_w)
# import pydantic
# class Graphics(pydantic.BaseModel): # class FlowsTable(pydantic.BaseModel):
# ''' # '''
# Data-AGGRegate: high level API onto multiple (categorized) # Data-AGGRegate: high level API onto multiple (categorized)
# ``ShmArray``s with high level processing routines for # ``Flow``s with high level processing routines for
# graphics computations and display. # multi-graphics computations and display.
# ''' # '''
# arrays: dict[str, np.ndarray] = {} # flows: dict[str, np.ndarray] = {}
# graphics: dict[str, pg.GraphicsObject] = {}
class Flow(BaseModel):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
'''
class Config:
arbitrary_types_allowed = True
name: str
plot: pg.PlotItem
shm: Optional[ShmArray] = None # may be filled in "later"
class ChartPlotWidget(pg.PlotWidget): class ChartPlotWidget(pg.PlotWidget):
@ -721,8 +735,9 @@ class ChartPlotWidget(pg.PlotWidget):
self.data_key: array, self.data_key: array,
} }
self._graphics = {} # registry of underlying graphics self._graphics = {} # registry of underlying graphics
# registry of overlay curve names # registry of overlay curve names
self._overlays: dict[str, ShmArray] = {} self._flows: dict[str, Flow] = {}
self._feeds: dict[Symbol, Feed] = {} self._feeds: dict[Symbol, Feed] = {}
@ -980,9 +995,6 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: this probably needs its own method? # TODO: this probably needs its own method?
if overlay: if overlay:
# anchor_at = ('bottom', 'left')
self._overlays[name] = None
if isinstance(overlay, pg.PlotItem): if isinstance(overlay, pg.PlotItem):
if overlay not in self.pi_overlay.overlays: if overlay not in self.pi_overlay.overlays:
raise RuntimeError( raise RuntimeError(
@ -990,6 +1002,9 @@ class ChartPlotWidget(pg.PlotWidget):
) )
pi = overlay pi = overlay
# anchor_at = ('bottom', 'left')
self._flows[name] = Flow(name=name, plot=pi)
else: else:
# anchor_at = ('top', 'left') # anchor_at = ('top', 'left')
@ -1062,7 +1077,7 @@ class ChartPlotWidget(pg.PlotWidget):
assert len(array) assert len(array)
data_key = array_key or graphics_name data_key = array_key or graphics_name
if graphics_name not in self._overlays: if graphics_name not in self._flows:
self._arrays[self.name] = array self._arrays[self.name] = array
else: else:
self._arrays[data_key] = array self._arrays[data_key] = array
@ -1164,9 +1179,15 @@ class ChartPlotWidget(pg.PlotWidget):
# f"begin: {begin}, end: {end}, extra: {extra}" # f"begin: {begin}, end: {end}, extra: {extra}"
# ) # )
a = self._arrays[name or self.name] # TODO: here we should instead look up the ``Flow.shm.array``
# and read directly from shm to avoid copying to memory first
# and then reading it again here.
a = self._arrays.get(name or self.name)
if a is None:
return None
ifirst = a[0]['index'] ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst + 1] bars = a[lbar - ifirst:(rbar - ifirst) + 1]
if not len(bars): if not len(bars):
# likely no data loaded yet or extreme scrolling? # likely no data loaded yet or extreme scrolling?

View File

@ -253,7 +253,7 @@ class ContentsLabels:
and index < array[-1]['index'] and index < array[-1]['index']
): ):
# out of range # out of range
print('out of range?') print('WTF out of range?')
continue continue
# array = chart._arrays[name] # array = chart._arrays[name]
@ -550,17 +550,20 @@ class Cursor(pg.GraphicsObject):
for cursor in opts.get('cursors', ()): for cursor in opts.get('cursors', ()):
cursor.setIndex(ix) cursor.setIndex(ix)
# update the label on the bottom of the crosshair # Update the label on the bottom of the crosshair.
axes = plot.plotItem.axes
# TODO: make this an up-front calc that we update # TODO: make this an up-front calc that we update
# on axis-widget resize events. # on axis-widget resize events instead of on every mouse
# update cylce.
# left axis offset width for calcuating # left axis offset width for calcuating
# absolute x-axis label placement. # absolute x-axis label placement.
left_axis_width = 0 left_axis_width = 0
left = axes.get('left') if len(plot.pi_overlay.overlays):
if left: # breakpoint()
left_axis_width = left['item'].width() lefts = plot.pi_overlay.get_axes('left')
if lefts:
for left in lefts:
left_axis_width += left.width()
# map back to abs (label-local) coordinates # map back to abs (label-local) coordinates
self.xaxis_label.update_label( self.xaxis_label.update_label(

View File

@ -24,6 +24,7 @@ import numpy as np
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5 import QtGui, QtWidgets from PyQt5 import QtGui, QtWidgets
from PyQt5.QtCore import ( from PyQt5.QtCore import (
Qt,
QLineF, QLineF,
QSizeF, QSizeF,
QRectF, QRectF,
@ -85,6 +86,14 @@ def step_path_arrays_from_1d(
return x_out, y_out return x_out, y_out
_line_styles: dict[str, int] = {
'solid': Qt.PenStyle.SolidLine,
'dash': Qt.PenStyle.DashLine,
'dot': Qt.PenStyle.DotLine,
'dashdot': Qt.PenStyle.DashDotLine,
}
# TODO: got a feeling that dropping this inheritance gets us even more speedups # TODO: got a feeling that dropping this inheritance gets us even more speedups
class FastAppendCurve(pg.PlotCurveItem): class FastAppendCurve(pg.PlotCurveItem):
''' '''
@ -106,6 +115,8 @@ class FastAppendCurve(pg.PlotCurveItem):
step_mode: bool = False, step_mode: bool = False,
color: str = 'default_lightest', color: str = 'default_lightest',
fill_color: Optional[str] = None, fill_color: Optional[str] = None,
style: str = 'solid',
name: Optional[str] = None,
**kwargs **kwargs
@ -114,14 +125,22 @@ class FastAppendCurve(pg.PlotCurveItem):
# TODO: we can probably just dispense with the parent since # TODO: we can probably just dispense with the parent since
# we're basically only using the pen setting now... # we're basically only using the pen setting now...
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._name = name
self._xrange: tuple[int, int] = self.dataBounds(ax=0) self._xrange: tuple[int, int] = self.dataBounds(ax=0)
# all history of curve is drawn in single px thickness # all history of curve is drawn in single px thickness
self.setPen(hcolor(color)) pen = pg.mkPen(hcolor(color))
pen.setStyle(_line_styles[style])
if 'dash' in style:
pen.setDashPattern([8, 3])
self.setPen(pen)
# last segment is drawn in 2px thickness for emphasis # last segment is drawn in 2px thickness for emphasis
self.last_step_pen = pg.mkPen(hcolor(color), width=2) # self.last_step_pen = pg.mkPen(hcolor(color), width=2)
self.last_step_pen = pg.mkPen(pen, width=2)
self._last_line: QLineF = None self._last_line: QLineF = None
self._last_step_rect: QRectF = None self._last_step_rect: QRectF = None
@ -135,6 +154,11 @@ class FastAppendCurve(pg.PlotCurveItem):
# interactions slower (such as zooming) and if so maybe if/when # interactions slower (such as zooming) and if so maybe if/when
# we implement a "history" mode for the view we disable this in # we implement a "history" mode for the view we disable this in
# that mode? # that mode?
if step_mode:
# don't enable caching by default for the case where the
# only thing drawn is the "last" line segment which can
# have a weird artifact where it won't be fully drawn to its
# endpoint (something we saw on trade rate curves)
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
def update_from_array( def update_from_array(
@ -245,10 +269,13 @@ class FastAppendCurve(pg.PlotCurveItem):
# self.path.connectPath(append_path) # self.path.connectPath(append_path)
path.connectPath(append_path) path.connectPath(append_path)
# XXX: pretty annoying but, without this there's little self.disable_cache()
# artefacts on the append updates to the curve... flip_cache = True
self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
self.prepareGeometryChange() if (
self._step_mode
):
self.disable_cache()
flip_cache = True flip_cache = True
# print(f"update br: {self.path.boundingRect()}") # print(f"update br: {self.path.boundingRect()}")
@ -273,6 +300,7 @@ class FastAppendCurve(pg.PlotCurveItem):
x_last + 0.5, y_last x_last + 0.5, y_last
) )
else: else:
# print((x[-1], y_last))
self._last_line = QLineF( self._last_line = QLineF(
x[-2], y[-2], x[-2], y[-2],
x[-1], y_last x[-1], y_last
@ -287,6 +315,12 @@ class FastAppendCurve(pg.PlotCurveItem):
# XXX: seems to be needed to avoid artifacts (see above). # XXX: seems to be needed to avoid artifacts (see above).
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
def disable_cache(self) -> None:
# XXX: pretty annoying but, without this there's little
# artefacts on the append updates to the curve...
self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
self.prepareGeometryChange()
def boundingRect(self): def boundingRect(self):
if self.path is None: if self.path is None:
return QtGui.QPainterPath().boundingRect() return QtGui.QPainterPath().boundingRect()
@ -323,6 +357,7 @@ class FastAppendCurve(pg.PlotCurveItem):
p: QtGui.QPainter, p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem, opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget w: QtWidgets.QWidget
) -> None: ) -> None:
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled()) profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
@ -340,11 +375,11 @@ class FastAppendCurve(pg.PlotCurveItem):
# p.drawPath(self.path) # p.drawPath(self.path)
# profiler('.drawPath()') # profiler('.drawPath()')
# else:
p.setPen(self.last_step_pen) p.setPen(self.last_step_pen)
p.drawLine(self._last_line) p.drawLine(self._last_line)
profiler('.drawLine()') profiler('.drawLine()')
# else:
p.setPen(self.opts['pen']) p.setPen(self.opts['pen'])
p.drawPath(self.path) p.drawPath(self.path)
profiler('.drawPath()') profiler('.drawPath()')

View File

@ -54,7 +54,7 @@ from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
# TODO: load this from a config.toml! # TODO: load this from a config.toml!
_quote_throttle_rate: int = 58 # Hz _quote_throttle_rate: int = 6 + 16 # Hz
# a working tick-type-classes template # a working tick-type-classes template
@ -106,7 +106,7 @@ def chart_maxmin(
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
async def update_linked_charts_graphics( async def graphics_update_loop(
linked: LinkedSplits, linked: LinkedSplits,
stream: tractor.MsgStream, stream: tractor.MsgStream,
ohlcv: np.ndarray, ohlcv: np.ndarray,
@ -258,13 +258,18 @@ async def update_linked_charts_graphics(
) )
last_mx_vlm = mx_vlm_in_view last_mx_vlm = mx_vlm_in_view
for curve_name, shm in vlm_chart._overlays.items(): for curve_name, flow in vlm_chart._flows.items():
update_fsp_chart( update_fsp_chart(
vlm_chart, vlm_chart,
shm, flow.shm,
curve_name, curve_name,
array_key=curve_name, array_key=curve_name,
) )
# is this even doing anything?
flow.plot.vb._set_yrange(
autoscale_linked_plots=False,
name=curve_name,
)
ticks_frame = quote.get('ticks', ()) ticks_frame = quote.get('ticks', ())
@ -411,14 +416,14 @@ async def update_linked_charts_graphics(
# TODO: all overlays on all subplots.. # TODO: all overlays on all subplots..
# run synchronous update on all derived overlays # run synchronous update on all derived overlays
for curve_name, shm in chart._overlays.items(): for curve_name, flow in chart._flows.items():
update_fsp_chart( update_fsp_chart(
chart, chart,
shm, flow.shm,
curve_name, curve_name,
array_key=curve_name, array_key=curve_name,
) )
# chart._set_yrange() # chart.view._set_yrange()
async def check_for_new_bars( async def check_for_new_bars(
@ -473,11 +478,11 @@ async def check_for_new_bars(
) )
# main chart overlays # main chart overlays
for name in price_chart._overlays: # for name in price_chart._flows:
for curve_name in price_chart._flows:
price_chart.update_curve_from_array( price_chart.update_curve_from_array(
name, curve_name,
price_chart._arrays[name] price_chart._arrays[curve_name]
) )
# each subplot # each subplot
@ -614,7 +619,7 @@ async def display_symbol_data(
# start graphics update loop after receiving first live quote # start graphics update loop after receiving first live quote
ln.start_soon( ln.start_soon(
update_linked_charts_graphics, graphics_update_loop,
linkedsplits, linkedsplits,
feed.stream, feed.stream,
ohlcv, ohlcv,

View File

@ -22,6 +22,7 @@ Financial signal processing cluster and real-time graphics management.
''' '''
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
import inspect
from itertools import cycle from itertools import cycle
from typing import Optional, AsyncGenerator, Any from typing import Optional, AsyncGenerator, Any
@ -37,6 +38,7 @@ from .._cacheables import maybe_open_context
from ..calc import humanize from ..calc import humanize
from ..data._sharedmem import ( from ..data._sharedmem import (
ShmArray, ShmArray,
_Token,
try_read, try_read,
) )
from ._chart import ( from ._chart import (
@ -50,7 +52,11 @@ from ._forms import (
) )
from ..fsp._api import maybe_mk_fsp_shm, Fsp from ..fsp._api import maybe_mk_fsp_shm, Fsp
from ..fsp import cascade from ..fsp import cascade
from ..fsp._volume import tina_vwap, dolla_vlm from ..fsp._volume import (
tina_vwap,
dolla_vlm,
flow_rates,
)
from ..log import get_logger from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
@ -153,7 +159,11 @@ async def open_fsp_sidepane(
sidepane.model = FspConfig() sidepane.model = FspConfig()
# just a logger for now until we get fsp configs up and running. # just a logger for now until we get fsp configs up and running.
async def settings_change(key: str, value: str) -> bool: async def settings_change(
key: str,
value: str
) -> bool:
print(f'{key}: {value}') print(f'{key}: {value}')
return True return True
@ -240,7 +250,7 @@ async def run_fsp_ui(
**conf.get('chart_kwargs', {}) **conf.get('chart_kwargs', {})
) )
# specially store ref to shm for lookup in display loop # specially store ref to shm for lookup in display loop
chart._overlays[name] = shm chart._flows[name].shm = shm
else: else:
# create a new sub-chart widget for this fsp # create a new sub-chart widget for this fsp
@ -363,6 +373,7 @@ class FspAdmin:
tuple, tuple,
tuple[tractor.MsgStream, ShmArray] tuple[tractor.MsgStream, ShmArray]
] = {} ] = {}
self._flow_registry: dict[_Token, str] = {}
self.src_shm = src_shm self.src_shm = src_shm
def rr_next_portal(self) -> tractor.Portal: def rr_next_portal(self) -> tractor.Portal:
@ -407,6 +418,11 @@ class FspAdmin:
loglevel=loglevel, loglevel=loglevel,
zero_on_step=conf.get('zero_on_step', False), zero_on_step=conf.get('zero_on_step', False),
shm_registry=[
(token.as_msg(), fsp_name, dst_token.as_msg())
for (token, fsp_name), dst_token
in self._flow_registry.items()
],
) as (ctx, last_index), ) as (ctx, last_index),
ctx.open_stream() as stream, ctx.open_stream() as stream,
@ -439,11 +455,15 @@ class FspAdmin:
fqsn = self.linked.symbol.front_feed() fqsn = self.linked.symbol.front_feed()
# allocate an output shm array # allocate an output shm array
dst_shm, opened = maybe_mk_fsp_shm( key, dst_shm, opened = maybe_mk_fsp_shm(
fqsn, '.'.join(fqsn),
target=target, target=target,
readonly=True, readonly=True,
) )
self._flow_registry[
(self.src_shm._token, target.name)
] = dst_shm._token
# if not opened: # if not opened:
# raise RuntimeError( # raise RuntimeError(
# f'Already started FSP `{fqsn}:{func_name}`' # f'Already started FSP `{fqsn}:{func_name}`'
@ -555,15 +575,22 @@ async def open_vlm_displays(
be spawned here. be spawned here.
''' '''
sig = inspect.signature(flow_rates.func)
params = sig.parameters
async with ( async with (
open_fsp_sidepane( open_fsp_sidepane(
linked, { linked, {
'vlm': { 'flows': {
# TODO: add support for dynamically changing these
'params': { 'params': {
'price_func': { u'\u03BC' + '_type': {
'default_value': 'chl3', 'default_value': str(params['mean_type'].default),
# tell target ``Edit`` widget to not allow },
# edits for now. 'period': {
'default_value': str(params['period'].default),
# make widget un-editable for now.
'widget_kwargs': {'readonly': True}, 'widget_kwargs': {'readonly': True},
}, },
}, },
@ -572,6 +599,12 @@ async def open_vlm_displays(
) as sidepane, ) as sidepane,
open_fsp_admin(linked, ohlcv) as admin, open_fsp_admin(linked, ohlcv) as admin,
): ):
# TODO: support updates
# period_field = sidepane.fields['period']
# period_field.setText(
# str(period_param.default)
# )
# built-in vlm which we plot ASAP since it's # built-in vlm which we plot ASAP since it's
# usually data provided directly with OHLC history. # usually data provided directly with OHLC history.
shm = ohlcv shm = ohlcv
@ -596,18 +629,19 @@ async def open_vlm_displays(
names: list[str], names: list[str],
) -> tuple[float, float]: ) -> tuple[float, float]:
mx = 0 mx = 0
for name in names: for name in names:
mxmn = chart.maxmin(name=name) mxmn = chart.maxmin(name=name)
if mxmn: if mxmn:
mx = max(mxmn[1], mx) ymax = mxmn[1]
if ymax > mx:
# if mx: mx = ymax
# return 0, mxmn[1]
return 0, mx return 0, mx
chart.view._maxmin = partial(maxmin, names=['volume']) chart.view.maxmin = partial(maxmin, names=['volume'])
# TODO: fix the x-axis label issue where if you put # TODO: fix the x-axis label issue where if you put
# the axis on the left it's totally not lined up... # the axis on the left it's totally not lined up...
@ -648,8 +682,9 @@ async def open_vlm_displays(
if dvlm: if dvlm:
tasks_ready = []
# spawn and overlay $ vlm on the same subchart # spawn and overlay $ vlm on the same subchart
shm, started = await admin.start_engine_task( dvlm_shm, started = await admin.start_engine_task(
dolla_vlm, dolla_vlm,
{ # fsp engine conf { # fsp engine conf
@ -663,11 +698,26 @@ async def open_vlm_displays(
}, },
# loglevel, # loglevel,
) )
tasks_ready.append(started)
# FIXME: we should error on starting the same fsp right
# since it might collide with existing shm.. or wait we
# had this before??
# dolla_vlm,
tasks_ready.append(started)
# profiler(f'created shm for fsp actor: {display_name}') # profiler(f'created shm for fsp actor: {display_name}')
await started.wait() # wait for all engine tasks to startup
async with trio.open_nursery() as n:
for event in tasks_ready:
n.start_soon(event.wait)
pi = chart.overlay_plotitem( # dolla vlm overlay
# XXX: the main chart already contains a vlm "units" axis
# so here we add an overlay wth a y-range in
# $ liquidity-value units (normally a fiat like USD).
dvlm_pi = chart.overlay_plotitem(
'dolla_vlm', 'dolla_vlm',
index=0, # place axis on inside (nearest to chart) index=0, # place axis on inside (nearest to chart)
axis_title=' $vlm', axis_title=' $vlm',
@ -679,24 +729,29 @@ async def open_vlm_displays(
digits=2, digits=2,
), ),
}, },
) )
# all to be overlayed curve names
fields = [
'dolla_vlm',
'dark_vlm',
]
dvlm_rate_fields = [
'dvlm_rate',
'dark_dvlm_rate',
]
trade_rate_fields = [
'trade_rate',
'dark_trade_rate',
]
# add custom auto range handler # add custom auto range handler
pi.vb._maxmin = partial( dvlm_pi.vb._maxmin = partial(
maxmin, maxmin,
# keep both regular and dark vlm in view # keep both regular and dark vlm in view
names=['dolla_vlm', 'dark_vlm'], names=fields + dvlm_rate_fields,
) )
curve, _ = chart.draw_curve(
name='dolla_vlm',
data=shm.array,
array_key='dolla_vlm',
overlay=pi,
step_mode=True,
# **conf.get('chart_kwargs', {})
)
# TODO: is there a way to "sync" the dual axes such that only # TODO: is there a way to "sync" the dual axes such that only
# one curve is needed? # one curve is needed?
# hide the original vlm curve since the $vlm one is now # hide the original vlm curve since the $vlm one is now
@ -704,43 +759,112 @@ async def open_vlm_displays(
# liquidity events (well at least on low OHLC periods - 1s). # liquidity events (well at least on low OHLC periods - 1s).
vlm_curve.hide() vlm_curve.hide()
# use slightly less light (then bracket) gray
# for volume from "main exchange" and a more "bluey"
# gray for "dark" vlm.
vlm_color = 'i3'
dark_vlm_color = 'charcoal'
# add dvlm (step) curves to common view
def chart_curves(
names: list[str],
pi: pg.PlotItem,
shm: ShmArray,
step_mode: bool = False,
style: str = 'solid',
) -> None:
for name in names:
if 'dark' in name:
color = dark_vlm_color
elif 'rate' in name:
color = vlm_color
else:
color = 'bracket'
curve, _ = chart.draw_curve(
# name='dolla_vlm',
name=name,
data=shm.array,
array_key=name,
overlay=pi,
color=color,
step_mode=step_mode,
style=style,
)
# TODO: we need a better API to do this.. # TODO: we need a better API to do this..
# specially store ref to shm for lookup in display loop # specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in # since only a placeholder of `None` is entered in
# ``.draw_curve()``. # ``.draw_curve()``.
chart._overlays['dolla_vlm'] = shm chart._flows[name].shm = shm
curve, _ = chart.draw_curve( chart_curves(
fields,
name='dark_vlm', dvlm_pi,
data=shm.array, dvlm_shm,
array_key='dark_vlm',
overlay=pi,
color='charcoal', # darker theme hue
step_mode=True, step_mode=True,
# **conf.get('chart_kwargs', {})
) )
chart._overlays['dark_vlm'] = shm
# XXX: old dict-style config before it was moved into the
# helper task
# 'dolla_vlm': {
# 'func_name': 'dolla_vlm',
# 'zero_on_step': True,
# 'overlay': 'volume',
# 'separate_axes': True,
# 'params': {
# 'price_func': {
# 'default_value': 'chl3',
# # tell target ``Edit`` widget to not allow
# # edits for now.
# 'widget_kwargs': {'readonly': True},
# },
# },
# 'chart_kwargs': {'step_mode': True}
# },
# } # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since this one depends on it.
fr_shm, started = await admin.start_engine_task(
flow_rates,
{ # fsp engine conf
'func_name': 'flow_rates',
'zero_on_step': True,
},
# loglevel,
)
await started.wait()
chart_curves(
dvlm_rate_fields,
dvlm_pi,
fr_shm,
)
# Trade rate overlay
# XXX: requires an additional overlay for
# a trades-per-period (time) y-range.
tr_pi = chart.overlay_plotitem(
'trade_rates',
# TODO: dynamically update period (and thus this axis?)
# title from user input.
axis_title='clears',
axis_side='left',
axis_kwargs={
'typical_max_str': ' 10.0 M ',
'formatter': partial(
humanize,
digits=2,
),
'text_color': vlm_color,
},
)
# add custom auto range handler
tr_pi.vb.maxmin = partial(
maxmin,
# keep both regular and dark vlm in view
names=trade_rate_fields,
)
chart_curves(
trade_rate_fields,
tr_pi,
fr_shm,
# step_mode=True,
# dashed line to represent "individual trades" being
# more "granular" B)
style='dash',
)
for pi in (dvlm_pi, tr_pi):
for name, axis_info in pi.axes.items(): for name, axis_info in pi.axes.items():
# lol this sux XD # lol this sux XD
axis = axis_info['item'] axis = axis_info['item']

View File

@ -342,7 +342,7 @@ class ChartView(ViewBox):
wheelEventRelay = QtCore.Signal(object, object, object) wheelEventRelay = QtCore.Signal(object, object, object)
event_relay_source: 'Optional[ViewBox]' = None event_relay_source: 'Optional[ViewBox]' = None
relays: dict[str, Signal] = {} relays: dict[str, QtCore.Signal] = {}
def __init__( def __init__(
self, self,
@ -421,6 +421,14 @@ class ChartView(ViewBox):
if self._maxmin is None: if self._maxmin is None:
self._maxmin = chart.maxmin self._maxmin = chart.maxmin
@property
def maxmin(self) -> Callable:
return self._maxmin
@maxmin.setter
def maxmin(self, callback: Callable) -> None:
self._maxmin = callback
def wheelEvent( def wheelEvent(
self, self,
ev, ev,
@ -474,7 +482,11 @@ class ChartView(ViewBox):
# lastPos = ev.lastPos() # lastPos = ev.lastPos()
# dif = pos - lastPos # dif = pos - lastPos
# dif = dif * -1 # dif = dif * -1
center = Point(fn.invertQTransform(self.childGroup.transform()).map(ev.pos())) center = Point(
fn.invertQTransform(
self.childGroup.transform()
).map(ev.pos())
)
# scale_y = 1.3 ** (center.y() * -1 / 20) # scale_y = 1.3 ** (center.y() * -1 / 20)
self.scaleBy(s, center) self.scaleBy(s, center)
@ -674,7 +686,8 @@ class ChartView(ViewBox):
# flag to prevent triggering sibling charts from the same linked # flag to prevent triggering sibling charts from the same linked
# set from recursion errors. # set from recursion errors.
autoscale_linked_plots: bool = True, autoscale_linked_plots: bool = True,
autoscale_overlays: bool = False, name: Optional[str] = None,
# autoscale_overlays: bool = False,
) -> None: ) -> None:
''' '''
@ -731,7 +744,12 @@ class ChartView(ViewBox):
) )
if set_range: if set_range:
ylow, yhigh = self._maxmin()
yrange = self._maxmin()
if yrange is None:
return
ylow, yhigh = yrange
# view margins: stay within a % of the "true range" # view margins: stay within a % of the "true range"
diff = yhigh - ylow diff = yhigh - ylow

View File

@ -103,11 +103,6 @@ class ComposedGridLayout:
dict[str, AxisItem], dict[str, AxisItem],
] = {} ] = {}
self._axes2pi: dict[
AxisItem,
dict[str, PlotItem],
] = {}
# TODO: better name? # TODO: better name?
# construct surrounding layouts for placing outer axes and # construct surrounding layouts for placing outer axes and
# their legends and title labels. # their legends and title labels.
@ -158,8 +153,8 @@ class ComposedGridLayout:
for name, axis_info in plotitem.axes.items(): for name, axis_info in plotitem.axes.items():
axis = axis_info['item'] axis = axis_info['item']
# register this plot's (maybe re-placed) axes for lookup. # register this plot's (maybe re-placed) axes for lookup.
self._pi2axes.setdefault(index, {})[name] = axis # print(f'inserting {name}:{axis} to index {index}')
self._axes2pi.setdefault(index, {})[name] = plotitem self._pi2axes.setdefault(name, {})[index] = axis
# enter plot into list for index tracking # enter plot into list for index tracking
self.items.insert(index, plotitem) self.items.insert(index, plotitem)
@ -213,11 +208,12 @@ class ComposedGridLayout:
# invert insert index for layouts which are # invert insert index for layouts which are
# not-left-to-right, top-to-bottom insert oriented # not-left-to-right, top-to-bottom insert oriented
insert_index = index
if name in ('top', 'left'): if name in ('top', 'left'):
index = min(len(axes) - index, 0) insert_index = min(len(axes) - index, 0)
assert index >= 0 assert insert_index >= 0
linlayout.insertItem(index, axis) linlayout.insertItem(insert_index, axis)
axes.insert(index, axis) axes.insert(index, axis)
self._register_item(index, plotitem) self._register_item(index, plotitem)
@ -243,13 +239,15 @@ class ComposedGridLayout:
plot: PlotItem, plot: PlotItem,
name: str, name: str,
) -> AxisItem: ) -> Optional[AxisItem]:
''' '''
Retrieve the named axis for overlayed ``plot``. Retrieve the named axis for overlayed ``plot`` or ``None``
if axis for that name is not shown.
''' '''
index = self.items.index(plot) index = self.items.index(plot)
return self._pi2axes[index][name] named = self._pi2axes[name]
return named.get(index)
def pop( def pop(
self, self,
@ -341,7 +339,7 @@ def mk_relay_method(
# halt/short circuit the graphicscene loop). Further the # halt/short circuit the graphicscene loop). Further the
# surrounding handler for this signal must be allowed to execute # surrounding handler for this signal must be allowed to execute
# and get processed by **this consumer**. # and get processed by **this consumer**.
print(f'{vb.name} rx relayed from {relayed_from.name}') # print(f'{vb.name} rx relayed from {relayed_from.name}')
ev.ignore() ev.ignore()
return slot( return slot(
@ -351,7 +349,7 @@ def mk_relay_method(
) )
if axis is not None: if axis is not None:
print(f'{vb.name} handling axis event:\n{str(ev)}') # print(f'{vb.name} handling axis event:\n{str(ev)}')
ev.accept() ev.accept()
return slot( return slot(
vb, vb,
@ -490,7 +488,6 @@ class PlotItemOverlay:
vb.setZValue(1000) # XXX: critical for scene layering/relaying vb.setZValue(1000) # XXX: critical for scene layering/relaying
self.overlays: list[PlotItem] = [] self.overlays: list[PlotItem] = []
from piker.ui._overlay import ComposedGridLayout
self.layout = ComposedGridLayout( self.layout = ComposedGridLayout(
root_plotitem, root_plotitem,
root_plotitem.layout, root_plotitem.layout,
@ -511,7 +508,7 @@ class PlotItemOverlay:
) -> None: ) -> None:
index = index or 0 index = index or len(self.overlays)
root = self.root_plotitem root = self.root_plotitem
# layout: QGraphicsGridLayout = root.layout # layout: QGraphicsGridLayout = root.layout
self.overlays.insert(index, plotitem) self.overlays.insert(index, plotitem)
@ -613,6 +610,26 @@ class PlotItemOverlay:
''' '''
return self.layout.get_axis(plot, name) return self.layout.get_axis(plot, name)
def get_axes(
self,
name: str,
) -> list[AxisItem]:
'''
Retrieve all axes for all plots with ``name: str``.
If a particular overlay doesn't have a displayed named axis
then it is not delivered in the returned ``list``.
'''
axes = []
for plot in self.overlays:
axis = self.layout.get_axis(plot, name)
if axis:
axes.append(axis)
return axes
# TODO: i guess we need this if you want to detach existing plots # TODO: i guess we need this if you want to detach existing plots
# dynamically? XXX: untested as of now. # dynamically? XXX: untested as of now.
def _disconnect_all( def _disconnect_all(

View File

@ -433,9 +433,12 @@ class OrderMode:
[ [
'notify-send', 'notify-send',
'-u', 'normal', '-u', 'normal',
'-t', '10000', '-t', '1616',
'piker', 'piker',
f'alert: {msg}',
# TODO: add in standard fill/exec info that maybe we
# pack in a broker independent way?
f'{msg["resp"]}: {msg["trigger_price"]}',
], ],
) )
log.runtime(result) log.runtime(result)
@ -666,7 +669,7 @@ async def open_order_mode(
) )
# vbox.setAlignment(feed_label, Qt.AlignBottom) # vbox.setAlignment(feed_label, Qt.AlignBottom)
# vbox.setAlignment(Qt.AlignBottom) # vbox.setAlignment(Qt.AlignBottom)
blank_h = chart.height() - ( _ = chart.height() - (
form.height() + form.height() +
form.fill_bar.height() form.fill_bar.height()
# feed_label.height() # feed_label.height()