Compare commits

...

61 Commits

Author SHA1 Message Date
Tyler Goodlet 9bc7de0cb3 Add notes about how to do mkts "trimming"
Which is basically just "deleting" rows from a column series.
You can only use the trim command from the `.cmd` cli and only with a so
called `LocalClient` currently; it's also sketchy af and caused
a machine to hang due to mem usage..

Ideally we can patch in this functionality for use by the rpc api
and have it not hang like this XD

Pertains to https://github.com/alpacahq/marketstore/issues/264
2022-05-16 14:31:23 -04:00
Tyler Goodlet 5c294f5ed4 Make vlm a float; discrete is so 80s 2022-05-16 14:31:04 -04:00
Tyler Goodlet b2833896a6 Fix output unpack 2022-05-16 08:12:07 -04:00
Tyler Goodlet 78defa00ec Drop old non-working flatten routine 2022-05-15 17:06:52 -04:00
Tyler Goodlet 3cb6b7221c Factor step format data gen into `to_step_format()`
Yet another path ops routine which converts a 1d array into a data
format suitable for rendering a "step curve" graphics path (aka a "bar
graph" but implemented as a continuous line).

Also, factor the `BarItems` rendering logic (which determines whether to
render the literal bars lines or a downsampled curve) into a routine
`render_baritems()` until we figure out the right abstraction layer for
it.
2022-05-15 16:54:50 -04:00
Tyler Goodlet 06d3cadcc0 Factor ohlc to line data conversion into `._pathops.ohlc_to_line()` 2022-05-15 15:45:06 -04:00
Tyler Goodlet fdd4255246 Drop commented `numba` imports 2022-05-15 15:44:19 -04:00
Tyler Goodlet e9c244ccd0 Move ohlc lines-curve generators into pathops mod 2022-05-15 15:21:25 -04:00
Tyler Goodlet ce68e612de Add `.ui._pathops` module
Starts a module for grouping together all our `QPainterpath` related
generation and data format operations for creation of fast curve
graphics. To start, drops `FastAppendCurve.downsample()` and moves
it to a new `._pathops.xy_downsample()`.
2022-05-15 15:15:14 -04:00
Tyler Goodlet 3bf907c10f Rename `._ohlc.gen_qpath()` -> `.gen_ohlc_qpath()` 2022-05-15 14:30:13 -04:00
Tyler Goodlet b7f7296f3a Drop `BarItems.update_from_array()`; moved into `Flow` 2022-05-15 14:29:03 -04:00
Tyler Goodlet e3dd933b34 Drop legacy step mode data formatter 2022-05-15 14:12:09 -04:00
Tyler Goodlet f93ac15440 Always delay interaction update profiling 2022-05-15 14:07:13 -04:00
Tyler Goodlet 54ae86544a Add "no-tsdb-found" history load length defaults 2022-05-15 14:07:13 -04:00
Tyler Goodlet 7555cb4318 Use `ms_threshold` throughout remaining profilers 2022-05-15 14:07:13 -04:00
Tyler Goodlet b83ec0fad6 Use new flag, add more marks through display loop 2022-05-15 14:07:13 -04:00
Tyler Goodlet a8fa034b61 Use new profiler arg name, add more marks throughout flow update 2022-05-15 14:07:13 -04:00
Tyler Goodlet 60ef3ba258 Drop legacy step path gen, always slice full data
Mostly just dropping old commented code for "step mode" format
generation. Always slice the tail part of the input data and move to the
new `ms_threshold` in the `pg` profiler'
2022-05-15 14:07:13 -04:00
Tyler Goodlet e1e42e4208 Error log brokerd msgs that have `.reqid == None`
Relates to the bug discovered in #310, this should avoid out-of-order
msgs which do not have a `.reqid` set to be error logged to console.
Further, add `pformat()` to kraken logging of ems msging.
2022-05-15 14:07:13 -04:00
Tyler Goodlet af61eac389 Only udpate prepended graphics when actually in view 2022-05-15 14:07:13 -04:00
Tyler Goodlet aafb506384 Add back mx/mn updates for L1-in-view, lost during rebase 2022-05-15 14:07:13 -04:00
Tyler Goodlet 8fec88236f Clean out legacy code from `Flow.update_graphics()` 2022-05-15 14:07:13 -04:00
Tyler Goodlet 083e6205e2 Drop `bar_wap` curve for now, seems to also be causing hangs?! 2022-05-15 14:07:13 -04:00
Tyler Goodlet f560ba396b Add profiler passthrough type annot, comments about appends vs. uppx 2022-05-15 14:07:13 -04:00
Tyler Goodlet 588f7fb7c3 An absolute uppx diff of >= 1 seems more then fine 2022-05-15 14:07:13 -04:00
Tyler Goodlet f01dd72dc2 Up the display throttle rate to 22Hz 2022-05-15 14:07:13 -04:00
Tyler Goodlet 948d12cce9 Only do curve appends on low uppx levels 2022-05-15 14:07:13 -04:00
Tyler Goodlet 2568c634bd Startup up with 3k bars 2022-05-15 14:07:13 -04:00
Tyler Goodlet a2b35c96b1 Drop vwap fsp for now; causes hangs.. 2022-05-15 14:07:13 -04:00
Tyler Goodlet a727ca2557 Drop step routine import 2022-05-15 14:07:13 -04:00
Tyler Goodlet 847e5319b5 Drop uppx guard around downsamples on interaction
Since downsampling with the more correct version of m4 (uppx driven
windows sizing) is super fast now we don't need to avoid downsampling
on low uppx values. Further all graphics objects now support in-view
slicing so make sure to use it on interaction updates. Pass in the view
profiler to update method calls for more detailed measuring.

Even moar,
- Add a manual call to `.maybe_downsample_graphics()` inside the mouse
  wheel event handler since it seems that sometimes trailing events get
  lost from the `.sigRangeChangedManually` signal which can result in
  "non-downsampled-enough" graphics on chart given the scroll amount;
  this manual call seems to entirely fix this?
- drop "max zoom" guard since internals now support (near) infinite
  scroll out to graphics becoming a single pixel column line XD
- add back in commented xrange signal connect code for easy testing to
  verify against range updates not happening without it
2022-05-15 14:07:13 -04:00
Tyler Goodlet fee8a76222 WIP get incremental step curve updates working
This took longer then i care to admit XD but it definitely adds a huge
speedup and with only a few outstanding correctness bugs:

- panning from left to right causes strange trailing artifacts in the
  flows fsp (vlm) sub-plot but only when some data is off-screen on the
  left but doesn't appear to be an issue if we keep the `._set_yrange()`
  handler hooked up to the `.sigXRangeChanged` signal (but we aren't
  going to because this makes panning way slower). i've got a feeling
  this is a bug todo with the device coordinate cache stuff and we may
  need to report to Qt core?
- factoring out the step curve logic from
  `FastAppendCurve.update_from_array()` (un)fortunately required some
  logic branch uncoupling but also meant we needed special input controls
  to avoid things like redraws and curve appends for special cases,
  this will hopefully all be better rectified in code when the core of
  this method is moved into a renderer type/implementation.
- the `tina_vwap` fsp curve now somehow causes hangs when doing erratic
  scrolling on downsampled graphics data. i have no idea why or how but
  disabling it makes the issue go away (ui will literally just freeze
  and gobble CPU on a `.paint()` call until you ctrl-c the hell out of
  it). my guess is that something in the logic for standard line curves
  and appends on large data sets is the issue?

Code related changes/hacks:
- drop use of `step_path_arrays_from_1d()`, it was always a bit hacky
  (being based on `pyqtgraph` internals) and was generally hard to
  understand since it returns 1d data instead of the more expected (N,2)
  array of "step levels"; instead this is now implemented (uglily) in
  the `Flow.update_graphics()` block for step curves (which will
  obviously get cleaned up and factored elsewhere).
- add a bunch of new flags to the update method on the fast append
  curve:  `draw_last: bool`, `slice_to_head: int`, `do_append: bool`,
  `should_redraw: bool` which are all controls to aid with previously
  mentioned issues specific to getting step curve updates working
  correctly.
- add a ton of commented tinkering related code (that we may end up
  using) to both the flow and append curve methods that was written as
  part of the effort to get this all working.
- implement all step curve updating inline in `Flow.update_graphics()`
  including prepend and append logic for pre-graphics incremental step
  data maintenance and in-view slicing as well as "last step" graphics
  updating.

Obviously clean up commits coming stat B)
2022-05-15 14:07:13 -04:00
Tyler Goodlet 64de90e482 Drop cursor debounce delay, decrease rate limit 2022-05-15 14:07:13 -04:00
Tyler Goodlet 8ad6f7890a Downsample on every uppx inrement since it's way faster 2022-05-15 14:07:13 -04:00
Tyler Goodlet 1d63a71de3 Drop log scaling support since uppx driven scaling seems way faster/better 2022-05-15 14:07:13 -04:00
Tyler Goodlet 88ba1765ba Hipshot, use uppx to drive theoretical px w 2022-05-15 14:07:13 -04:00
Tyler Goodlet 621cbdd015 Fix null match 2022-05-15 14:07:13 -04:00
Tyler Goodlet 243a9aa905 WIP incrementally update step array format 2022-05-15 14:07:13 -04:00
Tyler Goodlet a1de89d825 Always maybe render graphics
Since we have in-view style rendering working for all curve types
(finally) we can avoid the guard for low uppx levels and without losing
interaction speed. Further don't delay the profiler so that the nested
method calls correctly report upward - which wasn't working likely due
to some kinda GC collection related issue.
2022-05-15 14:07:13 -04:00
Tyler Goodlet 50b86247af Always set coords cache on curves 2022-05-15 14:07:13 -04:00
Tyler Goodlet 4cd8668059 Handle null output case for vlm chart mxmn 2022-05-15 14:07:13 -04:00
Tyler Goodlet 1fbfbf4e4c Right, handle the case where the shm prepend history isn't full XD 2022-05-15 14:07:13 -04:00
Tyler Goodlet e388f57e47 Always use coord cache, add naive view range diffing logic 2022-05-15 14:07:13 -04:00
Tyler Goodlet dbdb548f7f Put mxmn profile mapping at end of method 2022-05-15 14:07:13 -04:00
Tyler Goodlet c69be8b599 If a sample stream is already ded, just warn 2022-05-15 14:07:13 -04:00
Tyler Goodlet 45c5725d61 `FastAppendCurve`: Only render in-view data if possible
More or less this improves update latency like mad. Only draw data in
view and avoid full path regen as much as possible within a given
(down)sampling setting. We now support append path updates with in-view
data and the *SPECIAL CAVEAT* is that we avoid redrawing the whole curve
**only when** we calc an `append_length <= 1` **even if the view range
changed**. XXX: this should change in the future probably such that the
caller graphics update code can pass a flag which says whether or not to
do a full redraw based on it knowing where it's an interaction based
view-range change or a flow update change which doesn't require a full
path re-render.
2022-05-15 14:07:13 -04:00
Tyler Goodlet 46fb3004a1 Remove `._set_yrange()` handler from x-range-change signal 2022-05-15 14:07:13 -04:00
Tyler Goodlet 39f416efa0 Delegate graphics cycle max/min to chart/flows 2022-05-15 14:07:13 -04:00
Tyler Goodlet 9a716de36d Incrementally update flattend OHLC data
After much effort (and exhaustion) but failure to get a view into our
`numpy` OHLC struct-array, this instead allocates an in-thread-memory
array which is updated with flattened data every flow update cycle.

I need to report what I think is a bug to `numpy` core about the whole
view thing not working but, more or less this gets the same behaviour
and minimizes work to flatten the sampled data for line-graphics drawing
thus improving refresh latency when drawing large downsampled curves.

Update the OHLC ds curve with view aware data sliced out from the
pre-allocated and incrementally updated data (we had to add a last index
var `._iflat` to track appends - this should be moved into a renderer
eventually?).
2022-05-15 14:07:13 -04:00
Tyler Goodlet a05566ab53 Add `FastAppendCurve.draw_last()` 2022-05-15 14:07:13 -04:00
Tyler Goodlet e02d6f156e Don't require data input to constructor 2022-05-15 14:07:13 -04:00
Tyler Goodlet bafa1a02a5 More WIP, implement `BarItems` rendering in `Flow.update_graphics()` 2022-05-15 14:07:13 -04:00
Tyler Goodlet 4877aee729 Add `BarItems.draw_last()` and disable `.update_from_array()` 2022-05-15 14:07:13 -04:00
Tyler Goodlet 340b3b8c25 WIP starting architecture doc str writeup.. 2022-05-15 14:07:13 -04:00
Tyler Goodlet c753fef345 WIP incremental render apis 2022-05-15 14:07:13 -04:00
Tyler Goodlet 281f06a0f8 Warn before return lul 2022-05-15 14:07:13 -04:00
Tyler Goodlet d4e1464b66 Port view downsampling handler to new update apis 2022-05-15 14:07:13 -04:00
Tyler Goodlet cedddb83e4 Port ui components to use flows, drop all late assignments of shm 2022-05-15 14:07:13 -04:00
Tyler Goodlet eb85013b8c Add new `ui._flows` module
This begins the removal of data processing / analysis methods from the
chart widget and instead moving them to our new `Flow` API (in the new
module introduce here) and delegating the old chart methods to the
respective internal flow. Most importantly is no longer storing the
"last read" of an array from shm in an internal chart table (was
`._arrays`) and instead the `ShmArray` instance is passed as input and
stored in the `Flow` instance. This greatly simplifies lookup logic such
that the display loop now doesn't have to worry about reading shm, it
can be done by internal graphics logic as desired. Generally speaking,
all previous `._arrays`/`._graphics` lookups are now delegated to the
entries in the chart's `._flows` table.

The new `Flow` methods are generally better factored and provide more
detailed output regarding data-stream <-> graphics inter-relations for
the future purpose of allowing much more efficient update calls in the
display loop as well as supporting low latency interaction UX.

The concept here is that we're introducing an intermediary layer that
ties together graphics and real-time data flows such that widget code is
oriented around plot layout and the flow apis are oriented around
real-time low latency updates and providing an efficient high level
metric layer for the UX.

The summary api transition is something like:
- `update_graphics_from_array()` -> `.update_graphics_from_flow()`
- `.bars_range()` -> `Flow.datums_range()`
- `.bars_range()` -> `Flow.datums_range()`
2022-05-15 14:07:13 -04:00
Tyler Goodlet 4d4c7825e5 Drop task-per-method `trio`-`asyncio` proxying
Use method proxies through the remaining endpoints and drop the old
spawn-a-task-per-method-call style helpers from module.
2022-05-15 13:59:18 -04:00
Tyler Goodlet 58da3ceecf Proxy heaven, choose one "preferred data client"
In order to expose more `asyncio` powered `Client` methods to endpoint
task-code this adds a more extensive and layered set of `MethodProxy`
loading routines, in dependency order these are:
- `load_clients_for_trio()` a `tractor.to_asyncio.open_channel_from()`
  entry-point factory for loading all scanned clients on the `asyncio` side
  and delivering them over the inter-task channel to a `trio`-side task.
- `get_preferred_data_client()` a simple client instance loading routine
  which reads from the users `brokers.toml -> `prefer_data_account:
  list[str]` which must list account names, in priority order, that are
  acceptable to be used as the main "data connection client" such that
  only one of the detected clients is used for data (whereas the rest
  are used only for order entry).
- `open_client_proxies()` which delivers the detected `Client` set
  wrapped each in a `MethodProxy`.
- `open_data_client()` which directly delivers the preferred data client
  as a proxy for `trio` tasks.
- update `open_client_method_proxy()` and `open_client_proxy` to require
  an input `Client` instance.

Further impl details:
- add `MethodProxy._aio_ns` to ref the original `asyncio` side proxied instance
- add `Client.trades()` to pull executions from the last day/session
- load proxies inside `trades_dialogue` and use the new `.trades()`
  method to try and pull a fill ledger for eventual correct pp price
  calcs (pertains to #307)..
2022-05-15 13:59:18 -04:00
22 changed files with 2145 additions and 1315 deletions

View File

@ -14,14 +14,26 @@ secret = ""
[ib]
host = "127.0.0.1"
# when clients are being scanned this determines
# which clients are preferred to be used for data
# feeds based on the order of account names, if
# detected as active on an API client.
prefer_data_account = [
'paper',
'margin',
'ira',
]
# the order in which ports will be scanned
# (by the `brokerd` daemon-actor)
# is determined # by the line order here.
ports.gw = 4002
ports.tws = 7497
ports.order = ["gw", "tws",]
# the order in which accounts will be selectable
# in the order mode UI (if found via clients during
# API-app scanning)when a new symbol is loaded.
accounts.paper = "XX0000000"
accounts.margin = "X0000000"
accounts.ira = "X0000000"
accounts.paper = "XX0000000"
# the order in which accounts will be selected (if found through
# `brokerd`) when a new symbol is loaded
accounts_order = ['paper', 'margin', 'ira']

View File

@ -22,7 +22,9 @@ built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from contextlib import AsyncExitStack
from dataclasses import asdict, astuple
from datetime import datetime
from functools import partial
@ -39,6 +41,7 @@ import inspect
import logging
from random import randint
import time
from types import SimpleNamespace
import trio
@ -276,6 +279,27 @@ class Client:
# NOTE: the ib.client here is "throttled" to 45 rps by default
async def trades(
self,
# api_only: bool = False,
) -> dict[str, Any]:
# orders = await self.ib.reqCompletedOrdersAsync(
# apiOnly=api_only
# )
fills = await self.ib.reqExecutionsAsync()
norm_fills = []
for fill in fills:
fill = fill._asdict() # namedtuple
for key, val in fill.copy().items():
if isinstance(val, Contract):
fill[key] = asdict(val)
norm_fills.append(fill)
return norm_fills
async def bars(
self,
fqsn: str,
@ -894,7 +918,7 @@ async def load_aio_clients(
client_id: Optional[int] = None,
) -> Client:
) -> dict[str, Client]:
'''
Return an ``ib_insync.IB`` instance wrapped in our client API.
@ -1063,12 +1087,7 @@ async def load_aio_clients(
'Check your `brokers.toml` and/or network'
) from _err
# retreive first loaded client
clients = list(_client_cache.values())
if clients:
client = clients[0]
yield client, _client_cache, _accounts2clients
yield _accounts2clients
# TODO: this in a way that works xD
# finally:
@ -1080,62 +1099,90 @@ async def load_aio_clients(
# raise
async def _aio_run_client_method(
meth: str,
to_trio=None,
from_trio=None,
client=None,
**kwargs,
) -> None:
async def load_clients_for_trio(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
async with load_aio_clients() as (
_client,
clients,
accts2clients,
) -> None:
'''
Pure async mngr proxy to ``load_aio_clients()``.
'''
async with load_aio_clients() as accts2clients:
to_trio.send_nowait(accts2clients)
# TODO: maybe a sync event to wait on instead?
await asyncio.sleep(float('inf'))
@acm
async def open_client_proxies() -> tuple[
dict[str, MethodProxy],
dict[str, Client],
]:
proxies: dict[str, MethodProxy] = {}
async with (
tractor.to_asyncio.open_channel_from(
load_clients_for_trio,
) as (clients, from_aio),
AsyncExitStack() as stack
):
client = client or _client
async_meth = getattr(client, meth)
for acct_name, client in clients.items():
proxy = await stack.enter_async_context(
open_client_proxy(client),
)
proxies[acct_name] = proxy
# handle streaming methods
args = tuple(inspect.getfullargspec(async_meth).args)
if to_trio and 'to_trio' in args:
kwargs['to_trio'] = to_trio
log.runtime(f'Running {meth}({kwargs})')
return await async_meth(**kwargs)
yield proxies, clients
async def _trio_run_client_method(
method: str,
client: Optional[Client] = None,
**kwargs,
def get_preferred_data_client(
clients: dict[str, Client],
) -> None:
) -> tuple[str, Client]:
'''
Asyncio entry point to run tasks against the ``ib_insync`` api.
Load and return the (first found) `Client` instance that is
preferred and should be used for data by iterating, in priority
order, the ``ib.prefer_data_account: list[str]`` account names in
the users ``brokers.toml`` file.
'''
ca = tractor.current_actor()
assert ca.is_infected_aio()
conf = get_config()
data_accounts = conf['prefer_data_account']
# if the method is an *async gen* stream for it
# meth = getattr(Client, method)
for name in data_accounts:
client = clients.get(f'ib.{name}')
if client:
return name, client
else:
raise ValueError(
'No preferred data client could be found:\n'
f'{data_accounts}'
)
# args = tuple(inspect.getfullargspec(meth).args)
# if inspect.isasyncgenfunction(meth) or (
# # if the method is an *async func* but manually
# # streams back results, make sure to also stream it
# 'to_trio' in args
# ):
# kwargs['_treat_as_stream'] = True
@acm
async def open_data_client() -> MethodProxy:
'''
Open the first found preferred "data client" as defined in the
user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable
and deliver that client wrapped in a ``MethodProxy``.
return await to_asyncio.run_task(
_aio_run_client_method,
meth=method,
client=client,
**kwargs
)
'''
async with (
open_client_proxies() as (proxies, clients),
):
account_name, client = get_preferred_data_client(clients)
proxy = proxies.get(f'ib.{account_name}')
if not proxy:
raise ValueError(
f'No preferred data client could be found for {account_name}!'
)
yield proxy
class MethodProxy:
@ -1144,10 +1191,12 @@ class MethodProxy:
self,
chan: to_asyncio.LinkedTaskChannel,
event_table: dict[str, trio.Event],
asyncio_ns: SimpleNamespace,
) -> None:
self.chan = chan
self.event_table = event_table
self._aio_ns = asyncio_ns
async def _run_method(
self,
@ -1213,61 +1262,64 @@ class MethodProxy:
async def open_aio_client_method_relay(
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
client: Client,
event_consumers: dict[str, trio.Event],
) -> None:
async with load_aio_clients() as (
client,
clients,
accts2clients,
):
to_trio.send_nowait(client)
to_trio.send_nowait(client)
# TODO: separate channel for error handling?
client.inline_errors(to_trio)
# TODO: separate channel for error handling?
client.inline_errors(to_trio)
# relay all method requests to ``asyncio``-side client and
# deliver back results
while not to_trio._closed:
msg = await from_trio.get()
if msg is None:
print('asyncio PROXY-RELAY SHUTDOWN')
break
# relay all method requests to ``asyncio``-side client and deliver
# back results
while not to_trio._closed:
msg = await from_trio.get()
if msg is None:
print('asyncio PROXY-RELAY SHUTDOWN')
break
meth_name, kwargs = msg
meth = getattr(client, meth_name)
meth_name, kwargs = msg
meth = getattr(client, meth_name)
try:
resp = await meth(**kwargs)
# echo the msg back
to_trio.send_nowait({'result': resp})
try:
resp = await meth(**kwargs)
# echo the msg back
to_trio.send_nowait({'result': resp})
except (
RequestError,
except (
RequestError,
# TODO: relay all errors to trio?
# BaseException,
) as err:
to_trio.send_nowait({'exception': err})
# TODO: relay all errors to trio?
# BaseException,
) as err:
to_trio.send_nowait({'exception': err})
@acm
async def open_client_proxy() -> MethodProxy:
async def open_client_proxy(
client: Client,
) -> MethodProxy:
# try:
event_table = {}
async with (
to_asyncio.open_channel_from(
open_aio_client_method_relay,
client=client,
event_consumers=event_table,
) as (first, chan),
trio.open_nursery() as relay_n,
):
assert isinstance(first, Client)
proxy = MethodProxy(chan, event_table)
proxy = MethodProxy(
chan,
event_table,
asyncio_ns=first,
)
# mock all remote methods on ib ``Client``.
for name, method in inspect.getmembers(
@ -1318,7 +1370,7 @@ async def get_client(
'''
# TODO: the IPC via portal relay layer for when this current
# actor isn't in aio mode.
async with open_client_proxy() as proxy:
async with open_data_client() as proxy:
yield proxy
@ -1535,6 +1587,7 @@ async def get_bars(
# )
# TODO: some kinda resp here that indicates success
# otherwise retry?
# port = proxy._aio_ns.ib.client.port
await data_reset_hack()
# TODO: a while loop here if we timeout?
@ -1542,14 +1595,9 @@ async def get_bars(
('history', hist_ev),
# ('live', live_ev),
]:
# with trio.move_on_after(22) as cs:
await ev.wait()
log.info(f"{name} DATA RESET")
# if cs.cancelled_caught:
# log.warning("reset hack failed on first try?")
# await tractor.breakpoint()
fails += 1
continue
@ -1566,8 +1614,12 @@ async def open_history_client(
symbol: str,
) -> tuple[Callable, int]:
'''
History retreival endpoint - delivers a historical frame callble
that takes in ``pendulum.datetime`` and returns ``numpy`` arrays.
async with open_client_proxy() as proxy:
'''
async with open_data_client() as proxy:
async def get_hist(
end_dt: Optional[datetime] = None,
@ -1579,7 +1631,7 @@ async def open_history_client(
# TODO: add logic here to handle tradable hours and only grab
# valid bars in the range
if out == (None, None):
if out is None:
# could be trying to retreive bars over weekend
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(
@ -1632,7 +1684,8 @@ async def backfill_bars(
with trio.CancelScope() as cs:
# async with open_history_client(fqsn) as proxy:
async with open_client_proxy() as proxy:
# async with open_client_proxy() as proxy:
async with open_data_client() as proxy:
out, fails = await get_bars(proxy, fqsn)
@ -1729,16 +1782,16 @@ async def _setup_quote_stream(
'''
Stream a ticker using the std L1 api.
This task is ``asyncio``-side and must be called from
``tractor.to_asyncio.open_channel_from()``.
'''
global _quote_streams
to_trio.send_nowait(None)
async with load_aio_clients() as (
client,
clients,
accts2clients,
):
async with load_aio_clients() as accts2clients:
caccount_name, client = get_preferred_data_client(accts2clients)
contract = contract or (await client.find_contract(symbol))
ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts))
@ -1828,6 +1881,7 @@ async def open_aio_quote_stream(
_setup_quote_stream,
symbol=symbol,
contract=contract,
) as (first, from_aio):
# cache feed for later consumers
@ -1858,122 +1912,120 @@ async def stream_quotes(
sym = symbols[0]
log.info(f'request for real-time quotes: {sym}')
con, first_ticker, details = await _trio_run_client_method(
method='get_sym_details',
symbol=sym,
)
first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}')
async with open_data_client() as proxy:
def mk_init_msgs() -> dict[str, dict]:
# pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details)
syminfo.update(syminfo['contract'])
con, first_ticker, details = await proxy.get_sym_details(symbol=sym)
first_quote = normalize(first_ticker)
# print(f'first quote: {first_quote}')
# nested dataclass we probably don't need and that won't IPC serialize
syminfo.pop('secIdList')
def mk_init_msgs() -> dict[str, dict]:
# pass back some symbol info like min_tick, trading_hours, etc.
syminfo = asdict(details)
syminfo.update(syminfo['contract'])
# TODO: more consistent field translation
atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']]
# nested dataclass we probably don't need and that won't IPC
# serialize
syminfo.pop('secIdList')
# for stocks it seems TWS reports too small a tick size
# such that you can't submit orders with that granularity?
min_tick = 0.01 if atype == 'stock' else 0
# TODO: more consistent field translation
atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']]
syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick)
# for stocks it seems TWS reports too small a tick size
# such that you can't submit orders with that granularity?
min_tick = 0.01 if atype == 'stock' else 0
# for "traditional" assets, volume is normally discreet, not a float
syminfo['lot_tick_size'] = 0.0
syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick)
# TODO: for loop through all symbols passed in
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': syminfo,
'fqsn': first_quote['fqsn'],
# for "traditional" assets, volume is normally discreet, not
# a float
syminfo['lot_tick_size'] = 0.0
# TODO: for loop through all symbols passed in
init_msgs = {
# pass back token, and bool, signalling if we're the writer
# and that history has been written
sym: {
'symbol_info': syminfo,
'fqsn': first_quote['fqsn'],
}
}
}
return init_msgs
return init_msgs
init_msgs = mk_init_msgs()
init_msgs = mk_init_msgs()
# TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1):
contract, first_ticker, details = await _trio_run_client_method(
method='get_quote',
symbol=sym,
)
# TODO: we should instead spawn a task that waits on a feed to start
# and let it wait indefinitely..instead of this hard coded stuff.
with trio.move_on_after(1):
contract, first_ticker, details = await proxy.get_quote(symbol=sym)
# it might be outside regular trading hours so see if we can at
# least grab history.
if isnan(first_ticker.last):
task_status.started((init_msgs, first_quote))
# it might be outside regular trading hours so see if we can at
# least grab history.
if isnan(first_ticker.last):
task_status.started((init_msgs, first_quote))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run.
await trio.sleep_forever()
return # we never expect feed to come up?
async with open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (
ibis.Commodity,
ibis.Forex
):
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# last = time.time()
async for ticker in stream:
quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
# block and let data history backfill code run.
await trio.sleep_forever()
return # we never expect feed to come up?
async with open_aio_quote_stream(
symbol=sym,
contract=con,
) as stream:
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash)
first_ticker.ticks = []
task_status.started((init_msgs, first_quote))
async with aclosing(stream):
if type(first_ticker.contract) not in (
ibis.Commodity,
ibis.Forex
):
# wait for real volume on feed (trading might be closed)
while True:
ticker = await stream.receive()
# for a real volume contract we rait for the first
# "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
not ticker.rtTime
):
# spin consuming tickers until we get a real
# market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first real volume tick")
# ugh, clear ticks since we've consumed them
# (ahem, ib_insync is truly stateful trash)
ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell caller quotes are now coming in live
feed_is_live.set()
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()
async for ticker in stream:
quote = normalize(ticker)
await send_chan.send({quote['fqsn']: quote})
# ugh, clear ticks since we've consumed them
ticker.ticks = []
# last = time.time()
def pack_position(
@ -2123,11 +2175,16 @@ async def trades_dialogue(
# deliver positions to subscriber before anything else
all_positions = []
accounts = set()
clients: list[tuple[Client, trio.MemoryReceiveChannel]] = []
async with trio.open_nursery() as nurse:
for account, client in _accounts2clients.items():
async with (
trio.open_nursery() as nurse,
open_client_proxies() as (proxies, aioclients),
):
# for account, client in _accounts2clients.items():
for account, proxy in proxies.items():
client = aioclients[account]
async def open_stream(
task_status: TaskStatus[
@ -2149,7 +2206,8 @@ async def trades_dialogue(
assert account in accounts_def
accounts.add(account)
for client in _client_cache.values():
# for client in _client_cache.values():
for client in aioclients.values():
for pos in client.positions():
msg = pack_position(pos)
@ -2160,6 +2218,16 @@ async def trades_dialogue(
all_positions.append(msg.dict())
trades: list[dict] = []
for proxy in proxies.values():
trades.append(await proxy.trades())
log.info(f'Loaded {len(trades)} from this session')
# TODO: write trades to local ``trades.toml``
# - use above per-session trades data and write to local file
# - get the "flex reports" working and pull historical data and
# also save locally.
await ctx.started((
all_positions,
tuple(name for name in accounts_def if name in accounts),
@ -2352,95 +2420,96 @@ async def open_symbol_search(
ctx: tractor.Context,
) -> None:
# load all symbols locally for fast search
# TODO: load user defined symbol set locally for fast search?
await ctx.started({})
async with ctx.open_stream() as stream:
last = time.time()
async for pattern in stream:
log.debug(f'received {pattern}')
now = time.time()
assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz
diff = now - last
if diff < 1.0:
log.debug('throttle sleeping')
await trio.sleep(diff)
try:
pattern = stream.receive_nowait()
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client
# side will cache a null set result and not showing
# anything to the use on re-searches when this query
# timed out. We probably need a special "timeout" msg
# or something...
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
continue
log.debug(f'searching for {pattern}')
async with open_data_client() as proxy:
async with ctx.open_stream() as stream:
last = time.time()
# async batch search using api stocks endpoint and module
# defined adhoc symbol set.
stock_results = []
async for pattern in stream:
log.debug(f'received {pattern}')
now = time.time()
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
assert pattern, 'IB can not accept blank search pattern'
async with trio.open_nursery() as sn:
sn.start_soon(
stash_results,
_trio_run_client_method(
method='search_symbols',
pattern=pattern,
upto=5,
# throttle search requests to no faster then 1Hz
diff = now - last
if diff < 1.0:
log.debug('throttle sleeping')
await trio.sleep(diff)
try:
pattern = stream.receive_nowait()
except trio.WouldBlock:
pass
if not pattern or pattern.isspace():
log.warning('empty pattern received, skipping..')
# TODO: *BUG* if nothing is returned here the client
# side will cache a null set result and not showing
# anything to the use on re-searches when this query
# timed out. We probably need a special "timeout" msg
# or something...
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
continue
log.debug(f'searching for {pattern}')
last = time.time()
# async batch search using api stocks endpoint and module
# defined adhoc symbol set.
stock_results = []
async def stash_results(target: Awaitable[list]):
stock_results.extend(await target)
async with trio.open_nursery() as sn:
sn.start_soon(
stash_results,
proxy.search_symbols(
pattern=pattern,
upto=5,
),
)
)
# trigger async request
await trio.sleep(0)
# trigger async request
await trio.sleep(0)
# match against our ad-hoc set immediately
adhoc_matches = fuzzy.extractBests(
# match against our ad-hoc set immediately
adhoc_matches = fuzzy.extractBests(
pattern,
list(_adhoc_futes_set),
score_cutoff=90,
)
log.info(f'fuzzy matched adhocs: {adhoc_matches}')
adhoc_match_results = {}
if adhoc_matches:
# TODO: do we need to pull contract details?
adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extractBests(
pattern,
list(_adhoc_futes_set),
score_cutoff=90,
stock_results,
score_cutoff=50,
)
log.info(f'fuzzy matched adhocs: {adhoc_matches}')
adhoc_match_results = {}
if adhoc_matches:
# TODO: do we need to pull contract details?
adhoc_match_results = {i[0]: {} for i in adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extractBests(
pattern,
stock_results,
score_cutoff=50,
)
matches = adhoc_match_results | {
item[0]: {} for item in stock_matches
}
# TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches}
matches = adhoc_match_results | {
item[0]: {} for item in stock_matches
}
# TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches}
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)
async def data_reset_hack(
@ -2462,6 +2531,10 @@ async def data_reset_hack(
- integration with ``ib-gw`` run in docker + Xorg?
'''
# TODO: see if we can find which window is mapped to which process?
# for eg. if we can launch a paper account with docker and then find
# the pid of it, can we send keycommands to that container somehow?
# TODO: try out this lib instead, seems to be the most modern
# and usess the underlying lib:
# https://github.com/rshk/python-libxdo
@ -2476,8 +2549,8 @@ async def data_reset_hack(
try:
import i3ipc
except ImportError:
return False
log.warning('IB data hack no-supported on ur platformz')
return False
i3 = i3ipc.Connection()
t = i3.get_tree()
@ -2523,7 +2596,7 @@ async def data_reset_hack(
# move mouse to bottom left of window (where there should
# be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4),
'mousemove_relative', '--sync', str(w - 4), str(h - 4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id,

View File

@ -21,6 +21,7 @@ Kraken backend.
from contextlib import asynccontextmanager as acm
from dataclasses import asdict, field
from datetime import datetime
from pprint import pformat
from typing import Any, Optional, AsyncIterator, Callable, Union
import time
@ -569,7 +570,10 @@ async def handle_order_requests(
order: BrokerdOrder
async for request_msg in ems_order_stream:
log.info(f'Received order request {request_msg}')
log.info(
'Received order request:\n'
f'{pformat(request_msg)}'
)
action = request_msg['action']
@ -628,6 +632,7 @@ async def handle_order_requests(
# update the internal pairing of oid to krakens
# txid with the new txid that is returned on edit
reqid = resp['result']['txid']
# deliver ack that order has been submitted to broker routing
await ems_order_stream.send(
BrokerdOrderAck(
@ -788,7 +793,10 @@ async def trades_dialogue(
# Get websocket token for authenticated data stream
# Assert that a token was actually received.
resp = await client.endpoint('GetWebSocketsToken', {})
# lol wtf is this..
assert resp['error'] == []
token = resp['result']['token']
async with (

View File

@ -561,7 +561,10 @@ async def translate_and_relay_brokerd_events(
name = brokerd_msg['name']
log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}')
log.info(
f'Received broker trade event:\n'
f'{pformat(brokerd_msg)}'
)
if name == 'position':
@ -613,19 +616,28 @@ async def translate_and_relay_brokerd_events(
# packed at submission since we already know it ahead of
# time
paper = brokerd_msg['broker_details'].get('paper_info')
ext = brokerd_msg['broker_details'].get('external')
if paper:
# paperboi keeps the ems id up front
oid = paper['oid']
else:
elif ext:
# may be an order msg specified as "external" to the
# piker ems flow (i.e. generated by some other
# external broker backend client (like tws for ib)
ext = brokerd_msg['broker_details'].get('external')
if ext:
log.error(f"External trade event {ext}")
log.error(f"External trade event {ext}")
continue
else:
# something is out of order, we don't have an oid for
# this broker-side message.
log.error(
'Unknown oid:{oid} for msg:\n'
f'{pformat(brokerd_msg)}'
'Unable to relay message to client side!?'
)
else:
# check for existing live flow entry
entry = book._ems_entries.get(oid)
@ -823,7 +835,9 @@ async def process_client_order_cmds(
if reqid:
# send cancel to brokerd immediately!
log.info("Submitting cancel for live order {reqid}")
log.info(
f'Submitting cancel for live order {reqid}'
)
await brokerd_order_stream.send(msg.dict())

View File

@ -142,11 +142,17 @@ async def broadcast(
shm: Optional[ShmArray] = None,
) -> None:
# broadcast the buffer index step to any subscribers for
# a given sample period.
'''
Broadcast the given ``shm: ShmArray``'s buffer index step to any
subscribers for a given sample period.
The sent msg will include the first and last index which slice into
the buffer's non-empty data.
'''
subs = sampler.subscribers.get(delay_s, ())
last = -1
first = last = -1
if shm is None:
periods = sampler.ohlcv_shms.keys()
@ -156,11 +162,16 @@ async def broadcast(
if periods:
lowest = min(periods)
shm = sampler.ohlcv_shms[lowest][0]
first = shm._first.value
last = shm._last.value
for stream in subs:
try:
await stream.send({'index': last})
await stream.send({
'first': first,
'last': last,
'index': last,
})
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -168,7 +179,12 @@ async def broadcast(
log.error(
f'{stream._ctx.chan.uid} dropped connection'
)
subs.remove(stream)
try:
subs.remove(stream)
except ValueError:
log.warning(
f'{stream._ctx.chan.uid} sub already removed!?'
)
@tractor.context

View File

@ -33,7 +33,7 @@ ohlc_fields = [
('high', float),
('low', float),
('close', float),
('volume', int),
('volume', float),
('bar_wap', float),
]

View File

@ -227,7 +227,7 @@ def diff_history(
# the + 1 is because ``last_tsdb_dt`` is pulled from
# the last row entry for the ``'time'`` field retreived
# from the tsdb.
to_push = array[abs(s_diff)+1:]
to_push = array[abs(s_diff) + 1:]
else:
# pass back only the portion of the array that is
@ -250,6 +250,7 @@ async def start_backfill(
last_tsdb_dt: Optional[datetime] = None,
storage: Optional[Storage] = None,
write_tsdb: bool = True,
tsdb_is_up: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -265,8 +266,8 @@ async def start_backfill(
# sample period step size in seconds
step_size_s = (
pendulum.from_timestamp(times[-1]) -
pendulum.from_timestamp(times[-2])
pendulum.from_timestamp(times[-1])
- pendulum.from_timestamp(times[-2])
).seconds
# "frame"'s worth of sample period steps in seconds
@ -291,25 +292,33 @@ async def start_backfill(
# let caller unblock and deliver latest history frame
task_status.started((shm, start_dt, end_dt, bf_done))
# based on the sample step size, maybe load a certain amount history
if last_tsdb_dt is None:
# maybe a better default (they don't seem to define epoch?!)
# based on the sample step size load a certain amount
# history
if step_size_s == 1:
last_tsdb_dt = pendulum.now().subtract(days=2)
elif step_size_s == 60:
last_tsdb_dt = pendulum.now().subtract(years=2)
else:
if step_size_s not in (1, 60):
raise ValueError(
'`piker` only needs to support 1m and 1s sampling '
'but ur api is trying to deliver a longer '
f'timeframe of {step_size_s} ' 'seconds.. so ye, dun '
'do dat bruh.'
'do dat brudder.'
)
# when no tsdb "last datum" is provided, we just load
# some near-term history.
periods = {
1: {'days': 1},
60: {'days': 14},
}
if tsdb_is_up:
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
60: {'years': 2},
}
kwargs = periods[step_size_s]
last_tsdb_dt = start_dt.subtract(**kwargs)
# configure async query throttling
erlangs = config.get('erlangs', 1)
rate = config.get('rate', 1)
@ -567,8 +576,8 @@ async def start_backfill(
start_dt,
end_dt,
) = await get_ohlc_frame(
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
)
last_epoch = to_push['time'][-1]
diff = start - last_epoch
@ -794,6 +803,15 @@ async def manage_history(
# manually trigger step update to update charts/fsps
# which need an incremental update.
# NOTE: the way this works is super duper
# un-intuitive right now:
# - the broadcaster fires a msg to the fsp subsystem.
# - fsp subsys then checks for a sample step diff and
# possibly recomputes prepended history.
# - the fsp then sends back to the parent actor
# (usually a chart showing graphics for said fsp)
# which tells the chart to conduct a manual full
# graphics loop cycle.
for delay_s in sampler.subscribers:
await broadcast(delay_s)
@ -993,7 +1011,7 @@ async def open_feed_bus(
brokername: str,
symbol: str, # normally expected to the broker-specific fqsn
loglevel: str,
tick_throttle: Optional[float] = None,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
) -> None:
@ -1254,7 +1272,7 @@ async def install_brokerd_search(
# a backend module?
pause_period=getattr(
brokermod, '_search_conf', {}
).get('pause_period', 0.0616),
).get('pause_period', 0.0616),
):
yield

View File

@ -230,8 +230,8 @@ _ohlcv_dt = [
# ohlcv sampling
('Open', 'f4'),
('High', 'f4'),
('Low', 'i8'),
('Close', 'i8'),
('Low', 'f4'),
('Close', 'f4'),
('Volume', 'f4'),
]
@ -547,6 +547,17 @@ class Storage:
if err:
raise MarketStoreError(err)
# XXX: currently the only way to do this is through the CLI:
# sudo ./marketstore connect --dir ~/.config/piker/data
# >> \show mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# and this seems to block and use up mem..
# >> \trim mnq.globex.20220617.ib/1Sec/OHLCV 2022-05-15
# relevant source code for this is here:
# https://github.com/alpacahq/marketstore/blob/master/cmd/connect/session/trim.go#L14
# def delete_range(self, start_dt, end_dt) -> None:
# ...
@acm
async def open_storage_client(

View File

@ -369,7 +369,12 @@ async def cascade(
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send('update')
await client_stream.send({
'fsp_update': {
'key': dst_shm_token,
'first': dst._first.value,
'last': dst._last.value,
}})
return tracker, index
def is_synced(

View File

@ -223,8 +223,9 @@ class DynamicDateAxis(Axis):
) -> list[str]:
chart = self.linkedsplits.chart
bars = chart._arrays[chart.name]
shm = self.linkedsplits.chart._shm
flow = chart._flows[chart.name]
shm = flow.shm
bars = shm.array
first = shm._first.value
bars_len = len(bars)

View File

@ -34,9 +34,7 @@ from PyQt5.QtWidgets import (
QVBoxLayout,
QSplitter,
)
import msgspec
import numpy as np
# from pydantic import BaseModel
import pyqtgraph as pg
import trio
@ -49,6 +47,7 @@ from ._cursor import (
Cursor,
ContentsLabel,
)
from ..data._sharedmem import ShmArray
from ._l1 import L1Labels
from ._ohlc import BarItems
from ._curve import FastAppendCurve
@ -60,15 +59,12 @@ from ._style import (
)
from ..data.feed import Feed
from ..data._source import Symbol
from ..data._sharedmem import (
ShmArray,
# _Token,
)
from ..log import get_logger
from ._interaction import ChartView
from ._forms import FieldsForm
from .._profile import pg_profile_enabled, ms_slower_then
from ._overlay import PlotItemOverlay
from ._flows import Flow
if TYPE_CHECKING:
from ._display import DisplayState
@ -419,7 +415,7 @@ class LinkedSplits(QWidget):
self,
symbol: Symbol,
array: np.ndarray,
shm: ShmArray,
sidepane: FieldsForm,
style: str = 'bar',
@ -444,7 +440,7 @@ class LinkedSplits(QWidget):
self.chart = self.add_plot(
name=symbol.key,
array=array,
shm=shm,
style=style,
_is_main=True,
@ -472,7 +468,7 @@ class LinkedSplits(QWidget):
self,
name: str,
array: np.ndarray,
shm: ShmArray,
array_key: Optional[str] = None,
style: str = 'line',
@ -516,7 +512,6 @@ class LinkedSplits(QWidget):
name=name,
data_key=array_key or name,
array=array,
parent=qframe,
linkedsplits=self,
axisItems=axes,
@ -580,7 +575,7 @@ class LinkedSplits(QWidget):
graphics, data_key = cpw.draw_ohlc(
name,
array,
shm,
array_key=array_key
)
self.cursor.contents_labels.add_label(
@ -594,7 +589,7 @@ class LinkedSplits(QWidget):
add_label = True
graphics, data_key = cpw.draw_curve(
name,
array,
shm,
array_key=array_key,
color='default_light',
)
@ -603,7 +598,7 @@ class LinkedSplits(QWidget):
add_label = True
graphics, data_key = cpw.draw_curve(
name,
array,
shm,
array_key=array_key,
step_mode=True,
color='davies',
@ -691,7 +686,6 @@ class ChartPlotWidget(pg.PlotWidget):
# the "data view" we generate graphics from
name: str,
array: np.ndarray,
data_key: str,
linkedsplits: LinkedSplits,
@ -744,14 +738,6 @@ class ChartPlotWidget(pg.PlotWidget):
self._max_l1_line_len: float = 0
# self.setViewportMargins(0, 0, 0, 0)
# self._ohlc = array # readonly view of ohlc data
# TODO: move to Aggr above XD
# readonly view of data arrays
self._arrays = {
self.data_key: array,
}
self._graphics = {} # registry of underlying graphics
# registry of overlay curve names
self._flows: dict[str, Flow] = {}
@ -767,7 +753,6 @@ class ChartPlotWidget(pg.PlotWidget):
# show background grid
self.showGrid(x=False, y=True, alpha=0.3)
self.default_view()
self.cv.enable_auto_yrange()
self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem)
@ -816,14 +801,8 @@ class ChartPlotWidget(pg.PlotWidget):
Return a range tuple for the bars present in view.
'''
l, r = self.view_range()
array = self._arrays[self.name]
start, stop = self._xrange = (
array[0]['index'],
array[-1]['index'],
)
lbar = max(l, start)
rbar = min(r, stop)
main_flow = self._flows[self.name]
ifirst, l, lbar, rbar, r, ilast = main_flow.datums_range()
return l, lbar, rbar, r
def curve_width_pxs(
@ -877,40 +856,51 @@ class ChartPlotWidget(pg.PlotWidget):
def default_view(
self,
steps_on_screen: Optional[int] = None
bars_from_y: int = 3000,
) -> None:
'''
Set the view box to the "default" startup view of the scene.
'''
try:
index = self._arrays[self.name]['index']
except IndexError:
log.warning(f'array for {self.name} not loaded yet?')
flow = self._flows.get(self.name)
if not flow:
log.warning(f'`Flow` for {self.name} not loaded yet?')
return
index = flow.shm.array['index']
xfirst, xlast = index[0], index[-1]
l, lbar, rbar, r = self.bars_range()
marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1
view = self.view
if (
rbar < 0
or l < xfirst
or l < 0
or (rbar - lbar) < 6
):
# set fixed bars count on screen that approx includes as
# TODO: set fixed bars count on screen that approx includes as
# many bars as possible before a downsample line is shown.
begin = xlast - round(6116 / 6)
begin = xlast - bars_from_y
view.setXRange(
min=begin,
max=xlast,
padding=0,
)
# re-get range
l, lbar, rbar, r = self.bars_range()
else:
begin = end - (r - l)
# we get the L1 spread label "length" in view coords
# terms now that we've scaled either by user control
# or to the default set of bars as per the immediate block
# above.
marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1
begin = end - (r - l)
# for debugging
# print(
# f'bars range: {brange}\n'
# # f'bars range: {brange}\n'
# f'xlast: {xlast}\n'
# f'marker pos: {marker_pos}\n'
# f'l1 len: {l1_len}\n'
@ -922,14 +912,13 @@ class ChartPlotWidget(pg.PlotWidget):
if self._static_yrange == 'axis':
self._static_yrange = None
view = self.view
view.setXRange(
min=begin,
max=end,
padding=0,
)
view._set_yrange()
self.view.maybe_downsample_graphics()
view._set_yrange()
try:
self.linked.graphics_cycle()
except IndexError:
@ -960,7 +949,7 @@ class ChartPlotWidget(pg.PlotWidget):
def draw_ohlc(
self,
name: str,
data: np.ndarray,
shm: ShmArray,
array_key: Optional[str] = None,
@ -980,19 +969,21 @@ class ChartPlotWidget(pg.PlotWidget):
# the np array buffer to be drawn on next render cycle
self.plotItem.addItem(graphics)
# draw after to allow self.scene() to work...
graphics.draw_from_data(data)
data_key = array_key or name
self._graphics[data_key] = graphics
self._flows[data_key] = Flow(
name=name,
plot=self.plotItem,
_shm=shm,
is_ohlc=True,
graphics=graphics,
)
# TODO: i think we can eventually remove this if
# we write the ``Flow.update_graphics()`` method right?
# draw after to allow self.scene() to work...
graphics.draw_from_data(shm.array)
self._add_sticky(name, bg_color='davies')
return graphics, data_key
@ -1058,7 +1049,7 @@ class ChartPlotWidget(pg.PlotWidget):
self,
name: str,
data: np.ndarray,
shm: ShmArray,
array_key: Optional[str] = None,
overlay: bool = False,
@ -1071,7 +1062,7 @@ class ChartPlotWidget(pg.PlotWidget):
) -> (pg.PlotDataItem, str):
'''
Draw a "curve" (line plot graphics) for the provided data in
the input array ``data``.
the input shm array ``shm``.
'''
color = color or self.pen_color or 'default_light'
@ -1082,6 +1073,7 @@ class ChartPlotWidget(pg.PlotWidget):
data_key = array_key or name
# yah, we wrote our own B)
data = shm.array
curve = FastAppendCurve(
y=data[data_key],
x=data['index'],
@ -1105,16 +1097,14 @@ class ChartPlotWidget(pg.PlotWidget):
# and is disastrous for performance.
# curve.setCacheMode(QtWidgets.QGraphicsItem.ItemCoordinateCache)
# register curve graphics and backing array for name
self._graphics[name] = curve
self._arrays[data_key] = data
pi = pi or self.plotItem
self._flows[data_key] = Flow(
name=name,
plot=pi,
_shm=shm,
is_ohlc=False,
# register curve graphics with this flow
graphics=curve,
)
@ -1175,16 +1165,11 @@ class ChartPlotWidget(pg.PlotWidget):
)
return last
def update_graphics_from_array(
def update_graphics_from_flow(
self,
graphics_name: str,
array: Optional[np.ndarray] = None,
array_key: Optional[str] = None,
use_vr: bool = True,
render: bool = True,
**kwargs,
) -> pg.GraphicsObject:
@ -1192,63 +1177,11 @@ class ChartPlotWidget(pg.PlotWidget):
Update the named internal graphics from ``array``.
'''
if array is not None:
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
data_key = self.name
if array is not None:
# write array to internal graphics table
self._arrays[data_key] = array
else:
array = self._arrays[data_key]
# array key and graphics "name" might be different..
graphics = self._graphics[graphics_name]
# compute "in-view" indices
l, lbar, rbar, r = self.bars_range()
indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
in_view = array[lbar_i: rbar_i + 1]
if (
not in_view.size
or not render
):
return graphics
if isinstance(graphics, BarItems):
graphics.update_from_array(
array,
in_view,
view_range=(lbar_i, rbar_i) if use_vr else None,
**kwargs,
)
else:
graphics.update_from_array(
x=array['index'],
y=array[data_key],
x_iv=in_view['index'],
y_iv=in_view[data_key],
view_range=(lbar_i, rbar_i) if use_vr else None,
**kwargs
)
return graphics
flow = self._flows[array_key or graphics_name]
return flow.update_graphics(
array_key=array_key,
**kwargs,
)
# def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms
@ -1295,7 +1228,7 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: this should go onto some sort of
# data-view thinger..right?
ohlc = self._shm.array
ohlc = self._flows[self.name].shm.array
# XXX: not sure why the time is so off here
# looks like we're gonna have to do some fixing..
@ -1335,15 +1268,12 @@ class ChartPlotWidget(pg.PlotWidget):
'''
profiler = pg.debug.Profiler(
msg=f'`{str(self)}.maxmin()` loop cycle for: `{self.name}`',
msg=f'`{str(self)}.maxmin(name={name})`: `{self.name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
ms_threshold=ms_slower_then,
delayed=True,
)
l, lbar, rbar, r = bars_range or self.bars_range()
profiler(f'{self.name} got bars range')
# TODO: here we should instead look up the ``Flow.shm.array``
# and read directly from shm to avoid copying to memory first
# and then reading it again here.
@ -1353,112 +1283,19 @@ class ChartPlotWidget(pg.PlotWidget):
flow is None
):
log.error(f"flow {flow_key} doesn't exist in chart {self.name} !?")
res = 0, 0
key = res = 0, 0
else:
first, l, lbar, rbar, r, last = bars_range or flow.datums_range()
profiler(f'{self.name} got bars range')
key = round(lbar), round(rbar)
res = flow.maxmin(*key)
profiler(f'yrange mxmn: {key} -> {res}')
if res == (None, None):
log.error(
f"{flow_key} no mxmn for bars_range => {key} !?"
)
res = 0, 0
profiler(f'yrange mxmn: {key} -> {res}')
return res
# class FlowsTable(pydantic.BaseModel):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
class Flow(msgspec.Struct): # , frozen=True):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
is_ohlc: bool = False
graphics: pg.GraphicsObject
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
_shm: Optional[ShmArray] = None # currently, may be filled in "later"
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
cached_result = self._mxmns.get(rkey)
if cached_result:
return cached_result
shm = self.shm
if shm is None:
mxmn = None
else: # new block for profiling?..
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:
(rbar - ifirst) + 1
]
if not slice_view.size:
mxmn = None
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
mxmn = ylow, yhigh
if mxmn is not None:
# cache new mxmn result
self._mxmns[rkey] = mxmn
return mxmn

View File

@ -162,7 +162,7 @@ def ohlc_to_m4_line(
flat,
px_width=px_width,
uppx=uppx,
log_scale=bool(uppx)
# log_scale=bool(uppx)
)
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array([-0.43, 0, 0, 0.43])).flatten()
@ -181,7 +181,8 @@ def ds_m4(
# in display-device-local pixel units.
px_width: int,
uppx: Optional[float] = None,
log_scale: bool = True,
xrange: Optional[float] = None,
# log_scale: bool = True,
) -> tuple[int, np.ndarray, np.ndarray]:
'''
@ -210,50 +211,77 @@ def ds_m4(
# optionally log-scale down the "supposed pxs on screen"
# as the units-per-px (uppx) get's large.
if log_scale:
assert uppx, 'You must provide a `uppx` value to use log scaling!'
# if log_scale:
# assert uppx, 'You must provide a `uppx` value to use log scaling!'
# # uppx = uppx * math.log(uppx, 2)
# scaler = 2**7 / (1 + math.log(uppx, 2))
scaler = round(
max(
# NOTE: found that a 16x px width brought greater
# detail, likely due to dpi scaling?
# px_width=px_width * 16,
2**7 / (1 + math.log(uppx, 2)),
1
)
)
px_width *= scaler
# # scaler = 2**7 / (1 + math.log(uppx, 2))
# scaler = round(
# max(
# # NOTE: found that a 16x px width brought greater
# # detail, likely due to dpi scaling?
# # px_width=px_width * 16,
# 2**7 / (1 + math.log(uppx, 2)),
# 1
# )
# )
# px_width *= scaler
assert px_width > 1 # width of screen in pxs?
# else:
# px_width *= 16
# should never get called unless actually needed
assert px_width > 1 and uppx > 0
# NOTE: if we didn't pre-slice the data to downsample
# you could in theory pass these as the slicing params,
# do we care though since we can always just pre-slice the
# input?
x_start = x[0] # x value start/lowest in domain
x_end = x[-1] # x end value/highest in domain
if xrange is None:
x_end = x[-1] # x end value/highest in domain
xrange = (x_end - x_start)
# XXX: always round up on the input pixels
px_width = math.ceil(px_width)
# lnx = len(x)
# uppx *= max(4 / (1 + math.log(uppx, 2)), 1)
x_range = x_end - x_start
pxw = math.ceil(xrange / uppx)
# px_width = math.ceil(px_width)
# ratio of indexed x-value to width of raster in pixels.
# this is more or less, uppx: units-per-pixel.
w = x_range / float(px_width)
# w = xrange / float(px_width)
# uppx = uppx * math.log(uppx, 2)
# w2 = px_width / uppx
# scale up the width as the uppx get's large
w = uppx # * math.log(uppx, 666)
# ensure we make more then enough
# frames (windows) for the output pixel
frames = px_width
frames = pxw
# if we have more and then exact integer's
# (uniform quotient output) worth of datum-domain-points
# per windows-frame, add one more window to ensure
# we have room for all output down-samples.
pts_per_pixel, r = divmod(len(x), frames)
pts_per_pixel, r = divmod(xrange, frames)
if r:
# while r:
frames += 1
pts_per_pixel, r = divmod(xrange, frames)
# print(
# f'uppx: {uppx}\n'
# f'xrange: {xrange}\n'
# f'px_width: {px_width}\n'
# f'pxw: {pxw}\n'
# f'WTF w:{w}, w2:{w2}\n'
# f'frames: {frames}\n'
# )
assert frames >= (xrange / uppx)
# call into ``numba``
nb, i_win, y_out = _m4(

View File

@ -43,8 +43,8 @@ log = get_logger(__name__)
# latency (in terms of perceived lag in cross hair) so really be sure
# there's an improvement if you want to change it!
_mouse_rate_limit = 120 # TODO; should we calc current screen refresh rate?
_debounce_delay = 1 / 40
_mouse_rate_limit = 60 # TODO; should we calc current screen refresh rate?
_debounce_delay = 0
_ch_label_opac = 1
@ -254,13 +254,13 @@ class ContentsLabels:
def update_labels(
self,
index: int,
# array_name: str,
) -> None:
# for name, (label, update) in self._labels.items():
for chart, name, label, update in self._labels:
array = chart._arrays[name]
flow = chart._flows[name]
array = flow.shm.array
if not (
index >= 0
and index < array[-1]['index']
@ -269,8 +269,6 @@ class ContentsLabels:
print('WTF out of range?')
continue
# array = chart._arrays[name]
# call provided update func with data point
try:
label.show()
@ -472,9 +470,12 @@ class Cursor(pg.GraphicsObject):
) -> LineDot:
# if this plot contains curves add line dot "cursors" to denote
# the current sample under the mouse
main_flow = plot._flows[plot.name]
# read out last index
i = main_flow.shm.array[-1]['index']
cursor = LineDot(
curve,
index=plot._arrays[plot.name][-1]['index'],
index=i,
plot=plot
)
plot.addItem(cursor)

View File

@ -34,67 +34,17 @@ from PyQt5.QtCore import (
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor
from ._compression import (
# ohlc_to_m4_line,
ds_m4,
)
# from ._compression import (
# # ohlc_to_m4_line,
# ds_m4,
# )
from ._pathops import xy_downsample
from ..log import get_logger
log = get_logger(__name__)
def step_path_arrays_from_1d(
x: np.ndarray,
y: np.ndarray,
include_endpoints: bool = False,
) -> (np.ndarray, np.ndarray):
'''
Generate a "step mode" curve aligned with OHLC style bars
such that each segment spans each bar (aka "centered" style).
'''
y_out = y.copy()
x_out = x.copy()
x2 = np.empty(
# the data + 2 endpoints on either end for
# "termination of the path".
(len(x) + 1, 2),
# we want to align with OHLC or other sampling style
# bars likely so we need fractinal values
dtype=float,
)
x2[0] = x[0] - 0.5
x2[1] = x[0] + 0.5
x2[1:] = x[:, np.newaxis] + 0.5
# flatten to 1-d
x_out = x2.reshape(x2.size)
# we create a 1d with 2 extra indexes to
# hold the start and (current) end value for the steps
# on either end
y2 = np.empty((len(y), 2), dtype=y.dtype)
y2[:] = y[:, np.newaxis]
y_out = np.empty(
2*len(y) + 2,
dtype=y.dtype
)
# flatten and set 0 endpoints
y_out[1:-1] = y2.reshape(y2.size)
y_out[0] = 0
y_out[-1] = 0
if not include_endpoints:
return x_out[:-1], y_out[:-1]
else:
return x_out, y_out
_line_styles: dict[str, int] = {
'solid': Qt.PenStyle.SolidLine,
'dash': Qt.PenStyle.DashLine,
@ -119,8 +69,8 @@ class FastAppendCurve(pg.GraphicsObject):
def __init__(
self,
x: np.ndarray,
y: np.ndarray,
x: np.ndarray = None,
y: np.ndarray = None,
*args,
step_mode: bool = False,
@ -137,6 +87,9 @@ class FastAppendCurve(pg.GraphicsObject):
# brutaaalll, see comments within..
self._y = self.yData = y
self._x = self.xData = x
self._vr: Optional[tuple] = None
self._avr: Optional[tuple] = None
self._br = None
self._name = name
self.path: Optional[QtGui.QPainterPath] = None
@ -150,6 +103,7 @@ class FastAppendCurve(pg.GraphicsObject):
# self._xrange: tuple[int, int] = self.dataBounds(ax=0)
self._xrange: Optional[tuple[int, int]] = None
# self._x_iv_range = None
# self._last_draw = time.time()
self._in_ds: bool = False
@ -181,16 +135,12 @@ class FastAppendCurve(pg.GraphicsObject):
# interactions slower (such as zooming) and if so maybe if/when
# we implement a "history" mode for the view we disable this in
# that mode?
if step_mode:
# don't enable caching by default for the case where the
# only thing drawn is the "last" line segment which can
# have a weird artifact where it won't be fully drawn to its
# endpoint (something we saw on trade rate curves)
self.setCacheMode(
QGraphicsItem.DeviceCoordinateCache
)
self.update()
# if step_mode:
# don't enable caching by default for the case where the
# only thing drawn is the "last" line segment which can
# have a weird artifact where it won't be fully drawn to its
# endpoint (something we saw on trade rate curves)
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
# TODO: probably stick this in a new parent
# type which will contain our own version of
@ -225,34 +175,6 @@ class FastAppendCurve(pg.GraphicsObject):
QLineF(lbar, 0, rbar, 0)
).length()
def downsample(
self,
x,
y,
px_width,
uppx,
) -> tuple[np.ndarray, np.ndarray]:
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y = ds_m4(
x,
y,
px_width=px_width,
uppx=uppx,
log_scale=bool(uppx)
)
x = np.broadcast_to(x[:, None], y.shape)
# x = (x + np.array([-0.43, 0, 0, 0.43])).flatten()
x = (x + np.array([-0.5, 0, 0, 0.5])).flatten()
y = y.flatten()
# presumably?
self._in_ds = True
return x, y
def update_from_array(
self,
@ -266,6 +188,10 @@ class FastAppendCurve(pg.GraphicsObject):
view_range: Optional[tuple[int, int]] = None,
profiler: Optional[pg.debug.Profiler] = None,
draw_last: bool = True,
slice_to_head: int = -1,
do_append: bool = True,
should_redraw: bool = False,
) -> QtGui.QPainterPath:
'''
@ -278,80 +204,135 @@ class FastAppendCurve(pg.GraphicsObject):
profiler = profiler or pg.debug.Profiler(
msg=f'FastAppendCurve.update_from_array(): `{self._name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
ms_threshold=ms_slower_then,
)
# flip_cache = False
flip_cache = False
if self._xrange:
istart, istop = self._xrange
else:
self._xrange = istart, istop = x[0], x[-1]
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(istart - x[0])
append_length = int(x[-1] - istop)
# this is the diff-mode, "data"-rendered index
# tracking var..
self._xrange = x[0], x[-1]
# print(f"xrange: {self._xrange}")
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
self._x, self._y = x, y
if view_range:
profiler(f'view range slice {view_range}')
# self.xData = x
# self.yData = y
# self._x, self._y = x, y
# downsampling incremental state checking
uppx = self.x_uppx()
px_width = self.px_width()
uppx_diff = (uppx - self._last_uppx)
should_ds = False
should_redraw = False
new_sample_rate = False
should_ds = self._in_ds
showing_src_data = self._in_ds
# should_redraw = False
# by default we only pull data up to the last (current) index
x_out_full = x_out = x[:slice_to_head]
y_out_full = y_out = y[:slice_to_head]
# if a view range is passed, plan to draw the
# source ouput that's "in view" of the chart.
if view_range and not self._in_ds:
if (
view_range
# and not self._in_ds
# and not prepend_length > 0
):
# print(f'{self._name} vr: {view_range}')
# by default we only pull data up to the last (current) index
x_out, y_out = x_iv[:-1], y_iv[:-1]
x_out, y_out = x_iv[:slice_to_head], y_iv[:slice_to_head]
profiler(f'view range slice {view_range}')
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
# TODO: numba this bish
x_out, y_out = step_path_arrays_from_1d(
x_out,
y_out
vl, vr = view_range
# last_ivr = self._x_iv_range
# ix_iv, iy_iv = self._x_iv_range = (x_iv[0], x_iv[-1])
zoom_or_append = False
last_vr = self._vr
last_ivr = self._avr
if last_vr:
# relative slice indices
lvl, lvr = last_vr
# abs slice indices
al, ar = last_ivr
# append_length = int(x[-1] - istop)
# append_length = int(x_iv[-1] - ar)
# left_change = abs(x_iv[0] - al) >= 1
# right_change = abs(x_iv[-1] - ar) >= 1
if (
# likely a zoom view change
(vr - lvr) > 2 or vl < lvl
# append / prepend update
# we had an append update where the view range
# didn't change but the data-viewed (shifted)
# underneath, so we need to redraw.
# or left_change and right_change and last_vr == view_range
# not (left_change and right_change) and ivr
# (
# or abs(x_iv[ivr] - livr) > 1
):
zoom_or_append = True
# if last_ivr:
# liivl, liivr = last_ivr
if (
view_range != last_vr
and (
append_length > 1
or zoom_or_append
)
profiler('generated step arrays')
):
should_redraw = True
# print("REDRAWING BRUH")
should_redraw = True
profiler('sliced in-view array history')
self._vr = view_range
self._avr = x_iv[0], x_iv[slice_to_head]
# x_last = x_iv[-1]
# y_last = y_iv[-1]
self._last_vr = view_range
# self._last_vr = view_range
# self.disable_cache()
# flip_cache = True
else:
self._xrange = x[0], x[-1]
x_last = x[-1]
y_last = y[-1]
if prepend_length > 0:
should_redraw = True
# check for downsampling conditions
if (
# std m4 downsample conditions
px_width
and uppx_diff >= 4
or uppx_diff <= -3
or self._step_mode and abs(uppx_diff) >= 4
and abs(uppx_diff) >= 1
):
log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
)
self._last_uppx = uppx
new_sample_rate = True
showing_src_data = False
should_redraw = True
should_ds = True
elif (
@ -362,54 +343,36 @@ class FastAppendCurve(pg.GraphicsObject):
# source data so we clear our path data in prep
# to generate a new one from original source data.
should_redraw = True
new_sample_rate = True
should_ds = False
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(istart - x[0])
append_length = int(x[-1] - istop)
showing_src_data = True
# no_path_yet = self.path is None
if (
self.path is None
or should_redraw
or should_ds
or new_sample_rate
or prepend_length > 0
):
if (
not view_range
or self._in_ds
):
# by default we only pull data up to the last (current) index
x_out, y_out = x[:-1], y[:-1]
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
x_out, y_out = step_path_arrays_from_1d(
x_out,
y_out,
)
# TODO: numba this bish
profiler('generated step arrays')
if should_redraw:
profiler('path reversion to non-ds')
if self.path:
self.path.clear()
profiler('cleared paths due to `should_redraw=True`')
if self.fast_path:
self.fast_path.clear()
if should_redraw and not should_ds:
if self._in_ds:
log.info(f'DEDOWN -> {self._name}')
profiler('cleared paths due to `should_redraw` set')
if new_sample_rate and showing_src_data:
# if self._in_ds:
log.info(f'DEDOWN -> {self._name}')
self._in_ds = False
elif should_ds and px_width:
x_out, y_out = self.downsample(
elif should_ds and uppx and px_width > 1:
x_out, y_out = xy_downsample(
x_out,
y_out,
px_width,
@ -425,7 +388,10 @@ class FastAppendCurve(pg.GraphicsObject):
finiteCheck=False,
path=self.path,
)
profiler('generated fresh path')
self.prepareGeometryChange()
profiler(
f'generated fresh path. (should_redraw: {should_redraw} should_ds: {should_ds} new_sample_rate: {new_sample_rate})'
)
# profiler(f'DRAW PATH IN VIEW -> {self._name}')
# reserve mem allocs see:
@ -457,32 +423,27 @@ class FastAppendCurve(pg.GraphicsObject):
elif (
append_length > 0
and not view_range
and do_append
and not should_redraw
# and not view_range
):
new_x = x[-append_length - 2:-1]
new_y = y[-append_length - 2:-1]
print(f'{self._name} append len: {append_length}')
new_x = x[-append_length - 2:slice_to_head]
new_y = y[-append_length - 2:slice_to_head]
profiler('sliced append path')
if self._step_mode:
new_x, new_y = step_path_arrays_from_1d(
new_x,
new_y,
)
# [1:] since we don't need the vertical line normally at
# the beginning of the step curve taking the first (x,
# y) poing down to the x-axis **because** this is an
# appended path graphic.
new_x = new_x[1:]
new_y = new_y[1:]
profiler(
f'diffed array input, append_length={append_length}'
)
profiler('diffed append arrays')
if should_ds:
new_x, new_y = self.downsample(
new_x,
new_y,
**should_ds,
)
profiler(f'fast path downsample redraw={should_ds}')
# if should_ds:
# new_x, new_y = xy_downsample(
# new_x,
# new_y,
# px_width,
# uppx,
# )
# profiler(f'fast path downsample redraw={should_ds}')
append_path = pg.functions.arrayToQPath(
new_x,
@ -491,12 +452,13 @@ class FastAppendCurve(pg.GraphicsObject):
finiteCheck=False,
path=self.fast_path,
)
profiler('generated append qpath')
if self.use_fpath:
# an attempt at trying to make append-updates faster..
if self.fast_path is None:
self.fast_path = append_path
self.fast_path.reserve(int(6e3))
# self.fast_path.reserve(int(6e3))
else:
self.fast_path.connectPath(append_path)
size = self.fast_path.capacity()
@ -529,16 +491,43 @@ class FastAppendCurve(pg.GraphicsObject):
# self.disable_cache()
# flip_cache = True
if draw_last:
self.draw_last(x, y)
profiler('draw last segment')
# if flip_cache:
# # # XXX: seems to be needed to avoid artifacts (see above).
# self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
# trigger redraw of path
# do update before reverting to cache mode
self.update()
profiler('.update()')
def draw_last(
self,
x: np.ndarray,
y: np.ndarray,
) -> None:
x_last = x[-1]
y_last = y[-1]
# draw the "current" step graphic segment so it lines up with
# the "middle" of the current (OHLC) sample.
if self._step_mode:
self._last_line = QLineF(
x_last - 0.5, 0,
x_last + 0.5, 0,
# x_last, 0,
# x_last, 0,
)
self._last_step_rect = QRectF(
x_last - 0.5, 0,
x_last + 0.5, y_last
# x_last, 0,
# x_last, y_last
)
# print(
# f"path br: {self.path.boundingRect()}",
@ -548,20 +537,10 @@ class FastAppendCurve(pg.GraphicsObject):
else:
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y_last
x_last, y_last
)
profiler('draw last segment')
# trigger redraw of path
# do update before reverting to cache mode
# self.prepareGeometryChange()
self.update()
profiler('.update()')
# if flip_cache:
# # XXX: seems to be needed to avoid artifacts (see above).
# self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
@ -596,6 +575,10 @@ class FastAppendCurve(pg.GraphicsObject):
# self.disable_cache()
# self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
def reset_cache(self) -> None:
self.disable_cache()
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
def disable_cache(self) -> None:
'''
Disable the use of the pixel coordinate cache and trigger a geo event.
@ -604,7 +587,7 @@ class FastAppendCurve(pg.GraphicsObject):
# XXX: pretty annoying but, without this there's little
# artefacts on the append updates to the curve...
self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
self.prepareGeometryChange()
# self.prepareGeometryChange()
def boundingRect(self):
'''
@ -624,6 +607,7 @@ class FastAppendCurve(pg.GraphicsObject):
'''
hb = self.path.controlPointRect()
# hb = self.path.boundingRect()
hb_size = hb.size()
fp = self.fast_path
@ -632,17 +616,47 @@ class FastAppendCurve(pg.GraphicsObject):
hb_size = fhb.size() + hb_size
# print(f'hb_size: {hb_size}')
# if self._last_step_rect:
# hb_size += self._last_step_rect.size()
# if self._line:
# br = self._last_step_rect.bottomRight()
# tl = QPointF(
# # self._vr[0],
# # hb.topLeft().y(),
# # 0,
# # hb_size.height() + 1
# )
# if self._last_step_rect:
# br = self._last_step_rect.bottomRight()
# else:
# hb_size += QSizeF(1, 1)
w = hb_size.width() + 1
h = hb_size.height() + 1
# br = QPointF(
# self._vr[-1],
# # tl.x() + w,
# tl.y() + h,
# )
br = QRectF(
# top left
# hb.topLeft()
# tl,
QPointF(hb.topLeft()),
# br,
# total size
# QSizeF(hb_size)
# hb_size,
QSizeF(w, h)
)
self._br = br
# print(f'bounding rect: {br}')
return br
@ -657,8 +671,9 @@ class FastAppendCurve(pg.GraphicsObject):
profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.paint(): `{self._name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
ms_threshold=ms_slower_then,
)
self.prepareGeometryChange()
if (
self._step_mode
@ -681,7 +696,7 @@ class FastAppendCurve(pg.GraphicsObject):
if path:
p.drawPath(path)
profiler('.drawPath(path)')
profiler(f'.drawPath(path): {path.capacity()}')
fp = self.fast_path
if fp:

View File

@ -54,16 +54,16 @@ from ._forms import (
mk_order_pane_layout,
)
from .order_mode import open_order_mode
# from .._profile import (
# pg_profile_enabled,
# ms_slower_then,
# )
from .._profile import (
pg_profile_enabled,
ms_slower_then,
)
from ..log import get_logger
log = get_logger(__name__)
# TODO: load this from a config.toml!
_quote_throttle_rate: int = 12 # Hz
_quote_throttle_rate: int = 22 # Hz
# a working tick-type-classes template
@ -96,28 +96,19 @@ def chart_maxmin(
Compute max and min datums "in view" for range limits.
'''
array = ohlcv_shm.array
ifirst = array[0]['index']
last_bars_range = chart.bars_range()
l, lbar, rbar, r = last_bars_range
in_view = array[lbar - ifirst:rbar - ifirst + 1]
out = chart.maxmin()
if not in_view.size:
log.warning('Resetting chart to data')
chart.default_view()
if out is None:
return (last_bars_range, 0, 0, 0)
mx, mn = (
np.nanmax(in_view['high']),
np.nanmin(in_view['low'],)
)
mn, mx = out
mx_vlm_in_view = 0
if vlm_chart:
mx_vlm_in_view = np.max(
in_view['volume']
)
out = vlm_chart.maxmin()
if out:
_, mx_vlm_in_view = out
return (
last_bars_range,
@ -318,6 +309,7 @@ def graphics_update_cycle(
ds: DisplayState,
wap_in_history: bool = False,
trigger_all: bool = False, # flag used by prepend history updates
prepend_update_index: Optional[int] = None,
) -> None:
# TODO: eventually optimize this whole graphics stack with ``numba``
@ -327,9 +319,12 @@ def graphics_update_cycle(
profiler = pg.debug.Profiler(
msg=f'Graphics loop cycle for: `{chart.name}`',
disabled=True, # not pg_profile_enabled(),
gt=1/12 * 1e3,
# gt=ms_slower_then,
delayed=True,
# disabled=not pg_profile_enabled(),
disabled=True,
ms_threshold=ms_slower_then,
# ms_threshold=1/12 * 1e3,
)
# unpack multi-referenced components
@ -340,7 +335,7 @@ def graphics_update_cycle(
vars = ds.vars
tick_margin = vars['tick_margin']
update_uppx = 6
update_uppx = 16
for sym, quote in ds.quotes.items():
@ -374,8 +369,21 @@ def graphics_update_cycle(
l, lbar, rbar, r = brange
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
profiler('maxmin call')
liv = r > i_step # the last datum is in view
profiler('`ds.maxmin()` call')
liv = r >= i_step # the last datum is in view
if (
prepend_update_index is not None
and lbar > prepend_update_index
):
# on a history update (usually from the FSP subsys)
# if the segment of history that is being prepended
# isn't in view there is no reason to do a graphics
# update.
log.debug('Skipping prepend graphics cycle: frame not in view')
return
# don't real-time "shift" the curve to the
# left unless we get one of the following:
@ -383,7 +391,6 @@ def graphics_update_cycle(
(
i_diff > 0 # no new sample step
and xpx < 4 # chart is zoomed out very far
and r >= i_step # the last datum isn't in view
and liv
)
or trigger_all
@ -392,6 +399,7 @@ def graphics_update_cycle(
# pixel in a curve should show new data based on uppx
# and then iff update curves and shift?
chart.increment_view(steps=i_diff)
profiler('view incremented')
if vlm_chart:
# always update y-label
@ -401,17 +409,16 @@ def graphics_update_cycle(
if (
(
xpx < update_uppx or i_diff > 0
xpx < update_uppx
or i_diff > 0
and liv
)
or trigger_all
):
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_array(
vlm_chart.update_graphics_from_flow(
'volume',
array,
# UGGGh, see ``maxmin()`` impl in `._fsp` for
# the overlayed plotitems... we need a better
# bay to invoke a maxmin per overlay..
@ -424,6 +431,7 @@ def graphics_update_cycle(
# connected to update accompanying overlay
# graphics..
)
profiler('`vlm_chart.update_graphics_from_flow()`')
if (
mx_vlm_in_view != vars['last_mx_vlm']
@ -432,15 +440,21 @@ def graphics_update_cycle(
vlm_chart.view._set_yrange(
yrange=yrange,
)
profiler('`vlm_chart.view._set_yrange()`')
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
if not flow.render:
continue
update_fsp_chart(
vlm_chart,
flow,
curve_name,
array_key=curve_name,
do_append=xpx < update_uppx,
)
# is this even doing anything?
# (pretty sure it's the real-time
@ -500,9 +514,9 @@ def graphics_update_cycle(
or i_diff > 0
or trigger_all
):
chart.update_graphics_from_array(
chart.update_graphics_from_flow(
chart.name,
array,
do_append=xpx < update_uppx,
)
# iterate in FIFO order per tick-frame
@ -515,8 +529,9 @@ def graphics_update_cycle(
# tick frames to determine the y-range for chart
# auto-scaling.
# TODO: we need a streaming minmax algo here, see def above.
mx = max(price + tick_margin, mx)
mn = min(price - tick_margin, mn)
if liv:
mx = max(price + tick_margin, mx)
mn = min(price - tick_margin, mn)
if typ in clear_types:
@ -539,9 +554,8 @@ def graphics_update_cycle(
if wap_in_history:
# update vwap overlay line
chart.update_graphics_from_array(
chart.update_graphics_from_flow(
'bar_wap',
array,
)
# L1 book label-line updates
@ -557,7 +571,7 @@ def graphics_update_cycle(
if (
label is not None
# and liv
and liv
):
label.update_fields(
{'level': price, 'size': size}
@ -571,7 +585,7 @@ def graphics_update_cycle(
typ in _tick_groups['asks']
# TODO: instead we could check if the price is in the
# y-view-range?
# and liv
and liv
):
l1.ask_label.update_fields({'level': price, 'size': size})
@ -579,7 +593,7 @@ def graphics_update_cycle(
typ in _tick_groups['bids']
# TODO: instead we could check if the price is in the
# y-view-range?
# and liv
and liv
):
l1.bid_label.update_fields({'level': price, 'size': size})
@ -594,6 +608,7 @@ def graphics_update_cycle(
main_vb._ic is None
or not main_vb._ic.is_set()
):
# print(f'updating range due to mxmn')
main_vb._set_yrange(
# TODO: we should probably scale
# the view margin based on the size
@ -604,7 +619,8 @@ def graphics_update_cycle(
yrange=(mn, mx),
)
vars['last_mx'], vars['last_mn'] = mx, mn
# XXX: update this every draw cycle to make L1-always-in-view work.
vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all linked flows
for curve_name, flow in chart._flows.items():
@ -643,7 +659,7 @@ async def display_symbol_data(
)
# historical data fetch
brokermod = brokers.get_brokermod(provider)
# brokermod = brokers.get_brokermod(provider)
# ohlc_status_done = sbar.open_status(
# 'retreiving OHLC history.. ',
@ -692,32 +708,31 @@ async def display_symbol_data(
# create main OHLC chart
chart = linked.plot_ohlc_main(
symbol,
bars,
ohlcv,
sidepane=pp_pane,
)
chart.default_view()
chart._feeds[symbol.key] = feed
chart.setFocus()
# plot historical vwap if available
wap_in_history = False
if brokermod._show_wap_in_history:
# XXX: FOR SOME REASON THIS IS CAUSING HANGZ!?!
# if brokermod._show_wap_in_history:
if 'bar_wap' in bars.dtype.fields:
wap_in_history = True
chart.draw_curve(
name='bar_wap',
data=bars,
add_label=False,
)
# if 'bar_wap' in bars.dtype.fields:
# wap_in_history = True
# chart.draw_curve(
# name='bar_wap',
# shm=ohlcv,
# color='default_light',
# add_label=False,
# )
# size view to data once at outset
chart.cv._set_yrange()
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
chart._flows[chart.data_key].shm = ohlcv
# NOTE: we must immediately tell Qt to show the OHLC chart
# to avoid a race where the subplots get added/shown to
# the linked set *before* the main price chart!
@ -780,6 +795,5 @@ async def display_symbol_data(
sbar._status_groups[loading_sym_key][1]()
# let the app run.. bby
chart.default_view()
# linked.graphics_cycle()
await trio.sleep_forever()

View File

@ -343,7 +343,7 @@ class SelectRect(QtGui.QGraphicsRectItem):
nbars = ixmx - ixmn + 1
chart = self._chart
data = chart._arrays[chart.name][ixmn:ixmx]
data = chart._flows[chart.name].shm.array[ixmn:ixmx]
if len(data):
std = data['close'].std()

874
piker/ui/_flows.py 100644
View File

@ -0,0 +1,874 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
High level streaming graphics primitives.
This is an intermediate layer which associates real-time low latency
graphics primitives with underlying FSP related data structures for fast
incremental update.
'''
from __future__ import annotations
from functools import partial
from typing import (
Optional,
Callable,
)
import msgspec
import numpy as np
from numpy.lib import recfunctions as rfn
import pyqtgraph as pg
from PyQt5.QtGui import QPainterPath
from PyQt5.QtCore import (
# Qt,
QLineF,
# QSizeF,
QRectF,
# QPointF,
)
from ..data._sharedmem import (
ShmArray,
# open_shm_array,
)
from .._profile import (
pg_profile_enabled,
# ms_slower_then,
)
from ._pathops import (
gen_ohlc_qpath,
ohlc_to_line,
to_step_format,
)
from ._ohlc import (
BarItems,
)
from ._curve import (
FastAppendCurve,
)
from ..log import get_logger
log = get_logger(__name__)
# class FlowsTable(msgspec.Struct):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
# @classmethod
# def from_token(
# cls,
# shm_token: tuple[
# str,
# str,
# tuple[str, str],
# ],
# ) -> Renderer:
# shm = attach_shm_array(token)
# return cls(shm)
def rowarr_to_path(
rows_array: np.ndarray,
x_basis: np.ndarray,
flow: Flow,
) -> QPainterPath:
# TODO: we could in theory use ``numba`` to flatten
# if needed?
# to 1d
y = rows_array.flatten()
return pg.functions.arrayToQPath(
# these get passed at render call time
x=x_basis[:y.size],
y=y,
connect='all',
finiteCheck=False,
path=flow.path,
)
def render_baritems(
flow: Flow,
graphics: BarItems,
read: tuple[
int, int, np.ndarray,
int, int, np.ndarray,
],
profiler: pg.debug.Profiler,
**kwargs,
) -> None:
'''
Graphics management logic for a ``BarItems`` object.
Mostly just logic to determine when and how to downsample an OHLC
lines curve into a flattened line graphic and when to display one
graphic or the other.
TODO: this should likely be moved into some kind of better abstraction
layer, if not a `Renderer` then something just above it?
'''
(
xfirst, xlast, array,
ivl, ivr, in_view,
) = read
# if no source data renderer exists create one.
self = flow
r = self._src_r
if not r:
# OHLC bars path renderer
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
draw_path=gen_ohlc_qpath,
last_read=read,
)
ds_curve_r = Renderer(
flow=self,
# just swap in the flat view
# data_t=lambda array: self.gy.array,
last_read=read,
draw_path=partial(
rowarr_to_path,
x_basis=None,
),
)
curve = FastAppendCurve(
name='OHLC',
color=graphics._color,
)
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold.
self._render_table[0] = (
ds_curve_r,
curve,
)
dsc_r, curve = self._render_table[0]
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to
x_gt = 6
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
should_line
and uppx < x_gt
):
print('FLIPPING TO BARS')
should_line = False
elif (
not should_line
and uppx >= x_gt
):
print('FLIPPING TO LINE')
should_line = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
fields = ['open', 'high', 'low', 'close']
if self.gy is None:
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm
(
self._iflat_first,
self._iflat_last,
self.gx,
self.gy,
) = ohlc_to_line(
shm,
fields=fields,
)
# print(f'unstruct diff: {time.time() - start}')
gy = self.gy
# update flatted ohlc copy
(
iflat_first,
iflat,
ishm_last,
ishm_first,
) = (
self._iflat_first,
self._iflat_last,
self.shm._last.value,
self.shm._first.value
)
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
# write newly prepended data to flattened copy
gy[
ishm_first:iflat_first
] = rfn.structured_to_unstructured(
self.shm._array[fields][ishm_first:iflat_first]
)
self._iflat_first = ishm_first
to_update = rfn.structured_to_unstructured(
self.shm._array[iflat:ishm_last][fields]
)
gy[iflat:ishm_last][:] = to_update
profiler('updated ustruct OHLC data')
# slice out up-to-last step contents
y_flat = gy[ishm_first:ishm_last]
x_flat = self.gx[ishm_first:ishm_last]
# update local last-index tracking
self._iflat_last = ishm_last
# reshape to 1d for graphics rendering
y = y_flat.reshape(-1)
x = x_flat.reshape(-1)
profiler('flattened ustruct OHLC data')
# do all the same for only in-view data
y_iv_flat = y_flat[ivl:ivr]
x_iv_flat = x_flat[ivl:ivr]
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
profiler('flattened ustruct in-view OHLC data')
# pass into curve graphics processing
curve.update_from_array(
x,
y,
x_iv=x_iv,
y_iv=y_iv,
view_range=(ivl, ivr), # hack
profiler=profiler,
# should_redraw=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
)
curve.show()
profiler('updated ds curve')
else:
# render incremental or in-view update
# and apply ouput (path) to graphics.
path, last = r.render(
read,
only_in_view=True,
)
graphics.path = path
graphics.draw_last(last)
# NOTE: on appends we used to have to flip the coords
# cache thought it doesn't seem to be required any more?
# graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# graphics.prepareGeometryChange()
graphics.update()
if (
not in_line
and should_line
):
# change to line graphic
log.info(
f'downsampling to line graphic {self.name}'
)
graphics.hide()
# graphics.update()
curve.show()
curve.update()
elif in_line and not should_line:
log.info(f'showing bars graphic {self.name}')
curve.hide()
graphics.show()
graphics.update()
# update our pre-downsample-ready data and then pass that
# new data the downsampler algo for incremental update.
# graphics.update_from_array(
# array,
# in_view,
# view_range=(ivl, ivr) if use_vr else None,
# **kwargs,
# )
# generate and apply path to graphics obj
# graphics.path, last = r.render(
# read,
# only_in_view=True,
# )
# graphics.draw_last(last)
class Flow(msgspec.Struct): # , frozen=True):
'''
(Financial Signal-)Flow compound type which wraps a real-time
shm array stream with displayed graphics (curves, charts)
for high level access and control as well as efficient incremental
update.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
graphics: pg.GraphicsObject
_shm: ShmArray
is_ohlc: bool = False
render: bool = True # toggle for display loop
gy: Optional[ShmArray] = None
gx: Optional[np.ndarray] = None
_iflat_last: int = 0
_iflat_first: int = 0
_last_uppx: float = 0
_in_ds: bool = False
_graphics_tranform_fn: Optional[Callable[ShmArray, np.ndarray]] = None
# map from uppx -> (downsampled data, incremental graphics)
_src_r: Optional[Renderer] = None
_render_table: dict[
Optional[int],
tuple[Renderer, pg.GraphicsItem],
] = {}
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
# _shm: Optional[ShmArray] = None # currently, may be filled in "later"
# last read from shm (usually due to an update call)
_last_read: Optional[np.ndarray] = None
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
# TODO: remove this and only allow setting through
# private ``._shm`` attr?
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
print(f'{self.name} DO NOT SET SHM THIS WAY!?')
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
cached_result = self._mxmns.get(rkey)
if cached_result:
return cached_result
shm = self.shm
if shm is None:
mxmn = None
else: # new block for profiling?..
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:
(rbar - ifirst) + 1
]
if not slice_view.size:
mxmn = None
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
mxmn = ylow, yhigh
if mxmn is not None:
# cache new mxmn result
self._mxmns[rkey] = mxmn
return mxmn
def view_range(self) -> tuple[int, int]:
'''
Return the indexes in view for the associated
plot displaying this flow's data.
'''
vr = self.plot.viewRect()
return int(vr.left()), int(vr.right())
def datums_range(self) -> tuple[
int, int, int, int, int, int
]:
'''
Return a range tuple for the datums present in view.
'''
l, r = self.view_range()
# TODO: avoid this and have shm passed
# in earlier.
if self.shm is None:
# haven't initialized the flow yet
return (0, l, 0, 0, r, 0)
array = self.shm.array
index = array['index']
start = index[0]
end = index[-1]
lbar = max(l, start)
rbar = min(r, end)
return (
start, l, lbar, rbar, r, end,
)
def read(self) -> tuple[
int, int, np.ndarray,
int, int, np.ndarray,
]:
# read call
array = self.shm.array
indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
ifirst, l, lbar, rbar, r, ilast = self.datums_range()
# get read-relative indices adjusting
# for master shm index.
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
in_view = array[lbar_i: rbar_i + 1]
return (
# abs indices + full data set
ifirst, ilast, array,
# relative indices + in view datums
lbar_i, rbar_i, in_view,
)
def update_graphics(
self,
use_vr: bool = True,
render: bool = True,
array_key: Optional[str] = None,
profiler: Optional[pg.debug.Profiler] = None,
**kwargs,
) -> pg.GraphicsObject:
'''
Read latest datums from shm and render to (incrementally)
render to graphics.
'''
# profiler = profiler or pg.debug.Profiler(
profiler = pg.debug.Profiler(
msg=f'Flow.update_graphics() for {self.name}',
disabled=not pg_profile_enabled(),
# disabled=False,
ms_threshold=4,
# ms_threshold=ms_slower_then,
)
# shm read and slice to view
read = (
xfirst, xlast, array,
ivl, ivr, in_view,
) = self.read()
profiler('read src shm data')
if (
not in_view.size
or not render
):
return self.graphics
graphics = self.graphics
if isinstance(graphics, BarItems):
render_baritems(
self,
graphics,
read,
profiler,
**kwargs,
)
else:
# ``FastAppendCurve`` case:
array_key = array_key or self.name
uppx = graphics.x_uppx()
profiler(f'read uppx {uppx}')
if graphics._step_mode and self.gy is None:
shm = self.shm
(
self._iflat_first,
self.gx,
self.gy,
) = to_step_format(
shm,
array_key,
)
profiler('generated step mode data')
if graphics._step_mode:
(
iflat_first,
iflat,
ishm_last,
ishm_first,
) = (
self._iflat_first,
self._iflat_last,
self.shm._last.value,
self.shm._first.value
)
il = max(iflat - 1, 0)
profiler('read step mode incr update indices')
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
print(f'prepend {array_key}')
# i_prepend = self.shm._array['index'][
# ishm_first:iflat_first]
y_prepend = self.shm._array[array_key][
ishm_first:iflat_first
]
y2_prepend = np.broadcast_to(
y_prepend[:, None], (y_prepend.size, 2),
)
# write newly prepended data to flattened copy
self.gy[ishm_first:iflat_first] = y2_prepend
self._iflat_first = ishm_first
profiler('prepended step mode history')
append_diff = ishm_last - iflat
if append_diff:
# slice up to the last datum since last index/append update
# new_x = self.shm._array[il:ishm_last]['index']
new_y = self.shm._array[il:ishm_last][array_key]
new_y2 = np.broadcast_to(
new_y[:, None], (new_y.size, 2),
)
self.gy[il:ishm_last] = new_y2
profiler('updated step curve data')
# print(
# f'append size: {append_diff}\n'
# f'new_x: {new_x}\n'
# f'new_y: {new_y}\n'
# f'new_y2: {new_y2}\n'
# f'new gy: {gy}\n'
# )
# update local last-index tracking
self._iflat_last = ishm_last
# slice out up-to-last step contents
x_step = self.gx[ishm_first:ishm_last+2]
# shape to 1d
x = x_step.reshape(-1)
profiler('sliced step x')
y_step = self.gy[ishm_first:ishm_last+2]
lasts = self.shm.array[['index', array_key]]
last = lasts[array_key][-1]
y_step[-1] = last
# shape to 1d
y = y_step.reshape(-1)
# s = 6
# print(f'lasts: {x[-2*s:]}, {y[-2*s:]}')
profiler('sliced step y')
# do all the same for only in-view data
ys_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr+1]
y_iv = ys_iv.reshape(ys_iv.size)
x_iv = xs_iv.reshape(xs_iv.size)
# print(
# f'ys_iv : {ys_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n'
# f'xs_iv: {xs_iv[-s:]}\n'
# f'x_iv: {x_iv[-s:]}\n'
# )
profiler('sliced in view step data')
# legacy full-recompute-everytime method
# x, y = ohlc_flatten(array)
# x_iv, y_iv = ohlc_flatten(in_view)
# profiler('flattened OHLC data')
x_last = array['index'][-1]
y_last = array[array_key][-1]
graphics._last_line = QLineF(
x_last - 0.5, 0,
x_last + 0.5, 0,
)
graphics._last_step_rect = QRectF(
x_last - 0.5, 0,
x_last + 0.5, y_last,
)
# graphics.update()
graphics.update_from_array(
x=x,
y=y,
x_iv=x_iv,
y_iv=y_iv,
view_range=(ivl, ivr) if use_vr else None,
draw_last=False,
slice_to_head=-2,
should_redraw=bool(append_diff),
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
profiler=profiler,
**kwargs
)
profiler('updated step mode curve')
# graphics.reset_cache()
# print(
# f"path br: {graphics.path.boundingRect()}\n",
# # f"fast path br: {graphics.fast_path.boundingRect()}",
# f"last rect br: {graphics._last_step_rect}\n",
# f"full br: {graphics._br}\n",
# )
else:
x = array['index']
y = array[array_key]
x_iv = in_view['index']
y_iv = in_view[array_key]
profiler('sliced input arrays')
# graphics.draw_last(x, y)
graphics.update_from_array(
x=x,
y=y,
x_iv=x_iv,
y_iv=y_iv,
view_range=(ivl, ivr) if use_vr else None,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
profiler=profiler,
**kwargs
)
profiler('`graphics.update_from_array()` complete')
return graphics
class Renderer(msgspec.Struct):
flow: Flow
# called to render path graphics
draw_path: Callable[np.ndarray, QPainterPath]
# called on input data but before any graphics format
# conversions or processing.
data_t: Optional[Callable[ShmArray, np.ndarray]] = None
data_t_shm: Optional[ShmArray] = None
# called on the final data (transform) output to convert
# to "graphical data form" a format that can be passed to
# the ``.draw()`` implementation.
graphics_t: Optional[Callable[ShmArray, np.ndarray]] = None
graphics_t_shm: Optional[ShmArray] = None
# path graphics update implementation methods
prepend_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
append_fn: Optional[Callable[QPainterPath, QPainterPath]] = None
# last array view read
last_read: Optional[np.ndarray] = None
# output graphics rendering, the main object
# processed in ``QGraphicsObject.paint()``
path: Optional[QPainterPath] = None
# def diff(
# self,
# latest_read: tuple[np.ndarray],
# ) -> tuple[np.ndarray]:
# # blah blah blah
# # do diffing for prepend, append and last entry
# return (
# to_prepend
# to_append
# last,
# )
def render(
self,
new_read,
# only render datums "in view" of the ``ChartView``
only_in_view: bool = False,
) -> list[QPainterPath]:
'''
Render the current graphics path(s)
There are (at least) 3 stages from source data to graphics data:
- a data transform (which can be stored in additional shm)
- a graphics transform which converts discrete basis data to
a `float`-basis view-coords graphics basis. (eg. ``ohlc_flatten()``,
``step_path_arrays_from_1d()``, etc.)
- blah blah blah (from notes)
'''
# do full source data render to path
(
xfirst, xlast, array,
ivl, ivr, in_view,
) = self.last_read
if only_in_view:
array = in_view
# # get latest data from flow shm
# self.last_read = (
# xfirst, xlast, array, ivl, ivr, in_view
# ) = new_read
if self.path is None or only_in_view:
# redraw the entire source data if we have either of:
# - no prior path graphic rendered or,
# - we always intend to re-render the data only in view
# data transform: convert source data to a format
# expected to be incrementally updates and later rendered
# to a more graphics native format.
if self.data_t:
array = self.data_t(array)
# maybe allocate shm for data transform output
# if self.data_t_shm is None:
# fshm = self.flow.shm
# shm, opened = maybe_open_shm_array(
# f'{self.flow.name}_data_t',
# # TODO: create entry for each time frame
# dtype=array.dtype,
# readonly=False,
# )
# assert opened
# shm.push(array)
# self.data_t_shm = shm
elif self.path:
print(f'inremental update not supported yet {self.flow.name}')
# TODO: do incremental update
# prepend, append, last = self.diff(self.flow.read())
# do path generation for each segment
# and then push into graphics object.
hist, last = array[:-1], array[-1]
# call path render func on history
self.path = self.draw_path(hist)
self.last_read = new_read
return self.path, last

View File

@ -75,6 +75,7 @@ def update_fsp_chart(
flow,
graphics_name: str,
array_key: Optional[str],
**kwargs,
) -> None:
@ -93,10 +94,10 @@ def update_fsp_chart(
# update graphics
# NOTE: this does a length check internally which allows it
# staying above the last row check below..
chart.update_graphics_from_array(
chart.update_graphics_from_flow(
graphics_name,
array,
array_key=array_key or graphics_name,
**kwargs,
)
# XXX: re: ``array_key``: fsp func names must be unique meaning we
@ -106,9 +107,6 @@ def update_fsp_chart(
# read from last calculated value and update any label
last_val_sticky = chart._ysticks.get(graphics_name)
if last_val_sticky:
# array = shm.array[array_key]
# if len(array):
# value = array[-1]
last = last_row[array_key]
last_val_sticky.update_from_data(-1, last)
@ -246,20 +244,18 @@ async def run_fsp_ui(
chart.draw_curve(
name=name,
data=shm.array,
shm=shm,
overlay=True,
color='default_light',
array_key=name,
**conf.get('chart_kwargs', {})
)
# specially store ref to shm for lookup in display loop
chart._flows[name].shm = shm
else:
# create a new sub-chart widget for this fsp
chart = linkedsplits.add_plot(
name=name,
array=shm.array,
shm=shm,
array_key=name,
sidepane=sidepane,
@ -271,12 +267,6 @@ async def run_fsp_ui(
**conf.get('chart_kwargs', {})
)
# XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
chart._flows[chart.data_key].shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linkedsplits.chart.name
@ -445,12 +435,16 @@ class FspAdmin:
# wait for graceful shutdown signal
async with stream.subscribe() as stream:
async for msg in stream:
if msg == 'update':
info = msg.get('fsp_update')
if info:
# if the chart isn't hidden try to update
# the data on screen.
if not self.linked.isHidden():
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle(trigger_all=True)
self.linked.graphics_cycle(
trigger_all=True,
prepend_update_index=info['first'],
)
else:
log.info(f'recved unexpected fsp engine msg: {msg}')
@ -626,7 +620,7 @@ async def open_vlm_displays(
shm = ohlcv
chart = linked.add_plot(
name='volume',
array=shm.array,
shm=shm,
array_key='volume',
sidepane=sidepane,
@ -639,7 +633,6 @@ async def open_vlm_displays(
# the curve item internals are pretty convoluted.
style='step',
)
chart._flows['volume'].shm = ohlcv
# force 0 to always be in view
def maxmin(
@ -666,11 +659,6 @@ async def open_vlm_displays(
# chart.hideAxis('right')
# chart.showAxis('left')
# XXX: ONLY for sub-chart fsps, overlays have their
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
# send back new chart to caller
task_status.started(chart)
@ -685,9 +673,9 @@ async def open_vlm_displays(
last_val_sticky.update_from_data(-1, value)
vlm_curve = chart.update_graphics_from_array(
vlm_curve = chart.update_graphics_from_flow(
'volume',
shm.array,
# shm.array,
)
# size view to data once at outset
@ -795,9 +783,8 @@ async def open_vlm_displays(
color = 'bracket'
curve, _ = chart.draw_curve(
# name='dolla_vlm',
name=name,
data=shm.array,
shm=shm,
array_key=name,
overlay=pi,
color=color,
@ -812,7 +799,6 @@ async def open_vlm_displays(
# ``.draw_curve()``.
flow = chart._flows[name]
assert flow.plot is pi
flow.shm = shm
chart_curves(
fields,
@ -847,7 +833,9 @@ async def open_vlm_displays(
# liquidity events (well at least on low OHLC periods - 1s).
vlm_curve.hide()
chart.removeItem(vlm_curve)
chart._flows.pop('volume')
vflow = chart._flows['volume']
vflow.render = False
# avoid range sorting on volume once disabled
chart.view.disable_auto_yrange()
@ -902,10 +890,10 @@ async def open_vlm_displays(
# built-in vlm fsps
for target, conf in {
tina_vwap: {
'overlay': 'ohlc', # overlays with OHLCV (main) chart
'anchor': 'session',
},
# tina_vwap: {
# 'overlay': 'ohlc', # overlays with OHLCV (main) chart
# 'anchor': 'session',
# },
}.items():
started = await admin.open_fsp_chart(
target,

View File

@ -20,7 +20,6 @@ Chart view box primitives
"""
from __future__ import annotations
from contextlib import asynccontextmanager
# import itertools
import time
from typing import Optional, Callable
@ -35,10 +34,9 @@ import trio
from ..log import get_logger
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import _min_points_to_show
# from ._style import _min_points_to_show
from ._editors import SelectRect
from . import _event
from ._ohlc import BarItems
log = get_logger(__name__)
@ -486,15 +484,18 @@ class ChartView(ViewBox):
# don't zoom more then the min points setting
l, lbar, rbar, r = chart.bars_range()
vl = r - l
# vl = r - l
if ev.delta() > 0 and vl <= _min_points_to_show:
log.debug("Max zoom bruh...")
return
# if ev.delta() > 0 and vl <= _min_points_to_show:
# log.debug("Max zoom bruh...")
# return
if ev.delta() < 0 and vl >= len(chart._arrays[chart.name]) + 666:
log.debug("Min zoom bruh...")
return
# if (
# ev.delta() < 0
# and vl >= len(chart._flows[chart.name].shm.array) + 666
# ):
# log.debug("Min zoom bruh...")
# return
# actual scaling factor
s = 1.015 ** (ev.delta() * -1 / 20) # self.state['wheelScaleFactor'])
@ -568,6 +569,17 @@ class ChartView(ViewBox):
self._resetTarget()
self.scaleBy(s, focal)
# XXX: without this is seems as though sometimes
# when zooming in from far out (and maybe vice versa?)
# the signal isn't being fired enough since if you pan
# just after you'll see further downsampling code run
# (pretty noticeable on the OHLC ds curve) but with this
# that never seems to happen? Only question is how much this
# "double work" is causing latency when these missing event
# fires don't happen?
self.maybe_downsample_graphics()
self.sigRangeChangedManually.emit(mask)
# self._ic.set()
@ -734,7 +746,7 @@ class ChartView(ViewBox):
# flag to prevent triggering sibling charts from the same linked
# set from recursion errors.
autoscale_linked_plots: bool = True,
autoscale_linked_plots: bool = False,
name: Optional[str] = None,
# autoscale_overlays: bool = False,
@ -748,8 +760,9 @@ class ChartView(ViewBox):
'''
profiler = pg.debug.Profiler(
msg=f'`ChartView._set_yrange()`: `{self.name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
ms_threshold=ms_slower_then,
delayed=True,
)
set_range = True
@ -777,9 +790,15 @@ class ChartView(ViewBox):
# calculate max, min y values in viewable x-range from data.
# Make sure min bars/datums on screen is adhered.
else:
br = bars_range or chart.bars_range()
profiler(f'got bars range: {br}')
# else:
# TODO: eventually we should point to the
# ``FlowsTable`` (or wtv) which should perform
# the group operations?
# flow = chart._flows[name or chart.name]
# br = bars_range or chart.bars_range()
# br = bars_range or chart.bars_range()
# profiler(f'got bars range: {br}')
# TODO: maybe should be a method on the
# chart widget/item?
@ -795,7 +814,7 @@ class ChartView(ViewBox):
# for chart in plots:
# if chart and not chart._static_yrange:
# chart.cv._set_yrange(
# bars_range=br,
# # bars_range=br,
# autoscale_linked_plots=False,
# )
# profiler('autoscaled linked plots')
@ -809,11 +828,12 @@ class ChartView(ViewBox):
if yrange is None:
log.warning(f'No yrange provided for {self.name}!?')
print(f"WTF NO YRANGE {self.name}")
return
ylow, yhigh = yrange
profiler(f'maxmin(): {yrange}')
profiler(f'callback ._maxmin(): {yrange}')
# view margins: stay within a % of the "true range"
diff = yhigh - ylow
@ -830,6 +850,8 @@ class ChartView(ViewBox):
self.setYRange(ylow, yhigh)
profiler(f'set limits: {(ylow, yhigh)}')
profiler.finish()
def enable_auto_yrange(
self,
src_vb: Optional[ChartView] = None,
@ -843,17 +865,9 @@ class ChartView(ViewBox):
if src_vb is None:
src_vb = self
# such that when a linked chart changes its range
# this local view is also automatically changed and
# resized to data.
src_vb.sigXRangeChanged.connect(self._set_yrange)
# splitter(s) resizing
src_vb.sigResized.connect(self._set_yrange)
# mouse wheel doesn't emit XRangeChanged
src_vb.sigRangeChangedManually.connect(self._set_yrange)
# TODO: a smarter way to avoid calling this needlessly?
# 2 things i can think of:
# - register downsample-able graphics specially and only
@ -864,15 +878,16 @@ class ChartView(ViewBox):
self.maybe_downsample_graphics
)
def disable_auto_yrange(
self,
) -> None:
# mouse wheel doesn't emit XRangeChanged
src_vb.sigRangeChangedManually.connect(self._set_yrange)
# self._chart._static_yrange = 'axis'
# src_vb.sigXRangeChanged.connect(self._set_yrange)
# src_vb.sigXRangeChanged.connect(
# self.maybe_downsample_graphics
# )
def disable_auto_yrange(self) -> None:
self.sigXRangeChanged.disconnect(
self._set_yrange,
)
self.sigResized.disconnect(
self._set_yrange,
)
@ -883,6 +898,11 @@ class ChartView(ViewBox):
self._set_yrange,
)
# self.sigXRangeChanged.disconnect(self._set_yrange)
# self.sigXRangeChanged.disconnect(
# self.maybe_downsample_graphics
# )
def x_uppx(self) -> float:
'''
Return the "number of x units" within a single
@ -890,7 +910,7 @@ class ChartView(ViewBox):
graphics items which are our children.
'''
graphics = list(self._chart._graphics.values())
graphics = [f.graphics for f in self._chart._flows.values()]
if not graphics:
return 0
@ -903,23 +923,16 @@ class ChartView(ViewBox):
def maybe_downsample_graphics(self):
uppx = self.x_uppx()
if (
# we probably want to drop this once we are "drawing in
# view" for downsampled flows..
uppx and uppx > 16
and self._ic is not None
):
# don't bother updating since we're zoomed out bigly and
# in a pan-interaction, in which case we shouldn't be
# doing view-range based rendering (at least not yet).
# print(f'{uppx} exiting early!')
return
profiler = pg.debug.Profiler(
msg=f'ChartView.maybe_downsample_graphics() for {self.name}',
disabled=not pg_profile_enabled(),
gt=3,
delayed=True,
# XXX: important to avoid not seeing underlying
# ``.update_graphics_from_flow()`` nested profiling likely
# due to the way delaying works and garbage collection of
# the profiler in the delegated method calls.
ms_threshold=6,
# ms_threshold=ms_slower_then,
)
# TODO: a faster single-loop-iterator way of doing this XD
@ -928,19 +941,23 @@ class ChartView(ViewBox):
plots = linked.subplots | {chart.name: chart}
for chart_name, chart in plots.items():
for name, flow in chart._flows.items():
graphics = flow.graphics
use_vr = False
if isinstance(graphics, BarItems):
use_vr = True
if (
not flow.render
# XXX: super important to be aware of this.
# or not flow.graphics.isVisible()
):
continue
# pass in no array which will read and render from the last
# passed array (normally provided by the display loop.)
chart.update_graphics_from_array(
chart.update_graphics_from_flow(
name,
use_vr=use_vr,
profiler=profiler,
)
profiler(f'range change updated {chart_name}:{name}')
use_vr=True,
profiler.finish()
# gets passed down into graphics obj
# profiler=profiler,
)
profiler(f'<{chart_name}>.update_graphics_from_flow({name})')

View File

@ -25,17 +25,15 @@ from typing import (
import numpy as np
import pyqtgraph as pg
from numba import njit, float64, int64 # , optional
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QLineF, QPointF
# from numba import types as ntypes
# from ..data._source import numba_ohlc_dtype
from .._profile import pg_profile_enabled, ms_slower_then
from ._style import hcolor
from ..log import get_logger
from ._curve import FastAppendCurve
from ._compression import ohlc_flatten
from ._pathops import gen_ohlc_qpath
if TYPE_CHECKING:
from ._chart import LinkedSplits
@ -46,7 +44,7 @@ log = get_logger(__name__)
def bar_from_ohlc_row(
row: np.ndarray,
w: float
w: float = 0.43
) -> tuple[QLineF]:
'''
@ -84,118 +82,6 @@ def bar_from_ohlc_row(
return [hl, o, c]
@njit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
def gen_qpath(
data: np.ndarray,
start: int, # XXX: do we need this?
w: float,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath:
path_was_none = path is None
profiler = pg.debug.Profiler(
msg='gen_qpath ohlc',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
profiler("generate stream with numba")
# TODO: numba the internals of this!
path = pg.functions.arrayToQPath(
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath")
return path
class BarItems(pg.GraphicsObject):
'''
"Price range" bars graphics rendered from a OHLC sampled sequence.
@ -243,7 +129,7 @@ class BarItems(pg.GraphicsObject):
self.fast_path = QtGui.QPainterPath()
self._xrange: tuple[int, int]
self._yrange: tuple[float, float]
# self._yrange: tuple[float, float]
self._vrange = None
# TODO: don't render the full backing array each time
@ -273,17 +159,17 @@ class BarItems(pg.GraphicsObject):
'''
hist, last = ohlc[:-1], ohlc[-1]
self.path = gen_qpath(hist, start, self.w)
self.path = gen_ohlc_qpath(hist, start, self.w)
# save graphics for later reference and keep track
# of current internal "last index"
# self.start_index = len(ohlc)
index = ohlc['index']
self._xrange = (index[0], index[-1])
self._yrange = (
np.nanmax(ohlc['high']),
np.nanmin(ohlc['low']),
)
# self._yrange = (
# np.nanmax(ohlc['high']),
# np.nanmin(ohlc['low']),
# )
# up to last to avoid double draw of last bar
self._last_bar_lines = bar_from_ohlc_row(last, self.w)
@ -310,7 +196,7 @@ class BarItems(pg.GraphicsObject):
self._pi.addItem(curve)
self._ds_line = curve
self._ds_xrange = (index[0], index[-1])
# self._ds_xrange = (index[0], index[-1])
# trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update
@ -324,289 +210,43 @@ class BarItems(pg.GraphicsObject):
else:
return 0
def update_from_array(
def draw_last(
self,
# full array input history
ohlc: np.ndarray,
# pre-sliced array data that's "in view"
ohlc_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
profiler: Optional[pg.debug.Profiler] = None,
last: np.ndarray,
) -> None:
'''
Update the last datum's bar graphic from input data array.
# generate new lines objects for updatable "current bar"
self._last_bar_lines = bar_from_ohlc_row(last, self.w)
This routine should be interface compatible with
``pg.PlotCurveItem.setData()``. Normally this method in
``pyqtgraph`` seems to update all the data passed to the
graphics object, and then update/rerender, but here we're
assuming the prior graphics havent changed (OHLC history rarely
does) so this "should" be simpler and faster.
# last bar update
i, o, h, l, last, v = last[
['index', 'open', 'high', 'low', 'close', 'volume']
]
# assert i == self.start_index - 1
# assert i == last_index
body, larm, rarm = self._last_bar_lines
This routine should be made (transitively) as fast as possible.
# XXX: is there a faster way to modify this?
rarm.setLine(rarm.x1(), last, rarm.x2(), last)
'''
profiler = profiler or pg.debug.Profiler(
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
delayed=True,
)
# writer is responsible for changing open on "first" volume of bar
larm.setLine(larm.x1(), o, larm.x2(), o)
# index = self.start_index
istart, istop = self._xrange
ds_istart, ds_istop = self._ds_xrange
if l != h: # noqa
index = ohlc['index']
first_index, last_index = index[0], index[-1]
if body is None:
body = self._last_bar_lines[0] = QLineF(i, l, i, h)
else:
# update body
body.setLine(i, l, i, h)
# length = len(ohlc)
# prepend_length = istart - first_index
# append_length = last_index - istop
# ds_prepend_length = ds_istart - first_index
# ds_append_length = last_index - ds_istop
flip_cache = False
x_gt = 16
if self._ds_line:
uppx = self._ds_line.x_uppx()
else:
uppx = 0
should_line = self._in_ds
if (
self._in_ds
and uppx < x_gt
):
should_line = False
elif (
not self._in_ds
and uppx >= x_gt
):
should_line = True
profiler('ds logic complete')
if should_line:
# update the line graphic
# x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
x_iv, y_iv = self._ds_line_xy = ohlc_flatten(ohlc_iv)
profiler('flattening bars to line')
# TODO: we should be diffing the amount of new data which
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
curve = self._ds_line
curve.update_from_array(
x=x,
y=y,
x_iv=x_iv,
y_iv=y_iv,
view_range=None, # hack
profiler=profiler,
)
profiler('updated ds line')
if not self._in_ds:
# hide bars and show line
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {self._name}'
)
# self._pi.addItem(curve)
curve.show()
curve.update()
self._in_ds = True
# stop here since we don't need to update bars path any more
# as we delegate to the downsample line with updates.
profiler.finish()
# print('terminating early')
return
else:
# we should be in bars mode
if self._in_ds:
# flip back to bars graphics and hide the downsample line.
log.info(f'showing bars graphic {self._name}')
curve = self._ds_line
curve.hide()
# self._pi.removeItem(curve)
# XXX: is this actually any faster?
# self._pi.addItem(self)
self.show()
self._in_ds = False
# generate in_view path
self.path = gen_qpath(
ohlc_iv,
0,
self.w,
# path=self.path,
)
# TODO: to make the downsampling faster
# - allow mapping only a range of lines thus only drawing as
# many bars as exactly specified.
# - move ohlc "flattening" to a shmarr
# - maybe move all this embedded logic to a higher
# level type?
# if prepend_length:
# # new history was added and we need to render a new path
# prepend_bars = ohlc[:prepend_length]
# if ds_prepend_length:
# ds_prepend_bars = ohlc[:ds_prepend_length]
# pre_x, pre_y = ohlc_flatten(ds_prepend_bars)
# fx = np.concatenate((pre_x, fx))
# fy = np.concatenate((pre_y, fy))
# profiler('ds line prepend diff complete')
# if append_length:
# # generate new graphics to match provided array
# # path appending logic:
# # we need to get the previous "current bar(s)" for the time step
# # and convert it to a sub-path to append to the historical set
# # new_bars = ohlc[istop - 1:istop + append_length - 1]
# append_bars = ohlc[-append_length - 1:-1]
# # print(f'ohlc bars to append size: {append_bars.size}\n')
# if ds_append_length:
# ds_append_bars = ohlc[-ds_append_length - 1:-1]
# post_x, post_y = ohlc_flatten(ds_append_bars)
# print(
# f'ds curve to append sizes: {(post_x.size, post_y.size)}'
# )
# fx = np.concatenate((fx, post_x))
# fy = np.concatenate((fy, post_y))
# profiler('ds line append diff complete')
profiler('array diffs complete')
# does this work?
last = ohlc[-1]
# fy[-1] = last['close']
# # incremental update and cache line datums
# self._ds_line_xy = fx, fy
# maybe downsample to line
# ds = self.maybe_downsample()
# if ds:
# # if we downsample to a line don't bother with
# # any more path generation / updates
# self._ds_xrange = first_index, last_index
# profiler('downsampled to line')
# return
# print(in_view.size)
# if self.path:
# self.path = path
# self.path.reserve(path.capacity())
# self.path.swap(path)
# path updates
# if prepend_length:
# # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# # y value not matching the first value from
# # ohlc[prepend_length + 1] ???
# prepend_path = gen_qpath(prepend_bars, 0, self.w)
# old_path = self.path
# self.path = prepend_path
# self.path.addPath(old_path)
# profiler('path PREPEND')
# if append_length:
# append_path = gen_qpath(append_bars, 0, self.w)
# self.path.moveTo(
# float(istop - self.w),
# float(append_bars[0]['open'])
# )
# self.path.addPath(append_path)
# profiler('path APPEND')
# fp = self.fast_path
# if fp is None:
# self.fast_path = append_path
# else:
# fp.moveTo(
# float(istop - self.w), float(new_bars[0]['open'])
# )
# fp.addPath(append_path)
# self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# flip_cache = True
self._xrange = first_index, last_index
# trigger redraw despite caching
self.prepareGeometryChange()
# generate new lines objects for updatable "current bar"
self._last_bar_lines = bar_from_ohlc_row(last, self.w)
# last bar update
i, o, h, l, last, v = last[
['index', 'open', 'high', 'low', 'close', 'volume']
]
# assert i == self.start_index - 1
# assert i == last_index
body, larm, rarm = self._last_bar_lines
# XXX: is there a faster way to modify this?
rarm.setLine(rarm.x1(), last, rarm.x2(), last)
# writer is responsible for changing open on "first" volume of bar
larm.setLine(larm.x1(), o, larm.x2(), o)
if l != h: # noqa
if body is None:
body = self._last_bar_lines[0] = QLineF(i, l, i, h)
else:
# update body
body.setLine(i, l, i, h)
# XXX: pretty sure this is causing an issue where the bar has
# a large upward move right before the next sample and the body
# is getting set to None since the next bar is flat but the shm
# array index update wasn't read by the time this code runs. Iow
# we're doing this removal of the body for a bar index that is
# now out of date / from some previous sample. It's weird
# though because i've seen it do this to bars i - 3 back?
profiler('last bar set')
self.update()
profiler('.update()')
if flip_cache:
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
profiler.finish()
# XXX: pretty sure this is causing an issue where the bar has
# a large upward move right before the next sample and the body
# is getting set to None since the next bar is flat but the shm
# array index update wasn't read by the time this code runs. Iow
# we're doing this removal of the body for a bar index that is
# now out of date / from some previous sample. It's weird
# though because i've seen it do this to bars i - 3 back?
def boundingRect(self):
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
@ -680,7 +320,7 @@ class BarItems(pg.GraphicsObject):
profiler = pg.debug.Profiler(
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
ms_threshold=ms_slower_then,
)
# p.setCompositionMode(0)

View File

@ -0,0 +1,256 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Super fast ``QPainterPath`` generation related operator routines.
"""
from typing import (
Optional,
)
import numpy as np
from numpy.lib import recfunctions as rfn
from numba import njit, float64, int64 # , optional
import pyqtgraph as pg
from PyQt5 import QtGui
# from PyQt5.QtCore import QLineF, QPointF
from ..data._sharedmem import (
ShmArray,
)
from .._profile import pg_profile_enabled, ms_slower_then
from ._compression import (
# ohlc_flatten,
ds_m4,
)
def xy_downsample(
x,
y,
px_width,
uppx,
x_spacer: float = 0.5,
) -> tuple[np.ndarray, np.ndarray]:
# downsample whenever more then 1 pixels per datum can be shown.
# always refresh data bounds until we get diffing
# working properly, see above..
bins, x, y = ds_m4(
x,
y,
px_width=px_width,
uppx=uppx,
# log_scale=bool(uppx)
)
# flatten output to 1d arrays suitable for path-graphics generation.
x = np.broadcast_to(x[:, None], y.shape)
x = (x + np.array(
[-x_spacer, 0, 0, x_spacer]
)).flatten()
y = y.flatten()
return x, y
@njit(
# TODO: for now need to construct this manually for readonly arrays, see
# https://github.com/numba/numba/issues/4511
# ntypes.tuple((float64[:], float64[:], float64[:]))(
# numba_ohlc_dtype[::1], # contiguous
# int64,
# optional(float64),
# ),
nogil=True
)
def path_arrays_from_ohlc(
data: np.ndarray,
start: int64,
bar_gap: float64 = 0.43,
) -> np.ndarray:
'''
Generate an array of lines objects from input ohlc data.
'''
size = int(data.shape[0] * 6)
x = np.zeros(
# data,
shape=size,
dtype=float64,
)
y, c = x.copy(), x.copy()
# TODO: report bug for assert @
# /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991
for i, q in enumerate(data[start:], start):
# TODO: ask numba why this doesn't work..
# open, high, low, close, index = q[
# ['open', 'high', 'low', 'close', 'index']]
open = q['open']
high = q['high']
low = q['low']
close = q['close']
index = float64(q['index'])
istart = i * 6
istop = istart + 6
# x,y detail the 6 points which connect all vertexes of a ohlc bar
x[istart:istop] = (
index - bar_gap,
index,
index,
index,
index,
index + bar_gap,
)
y[istart:istop] = (
open,
open,
low,
high,
close,
close,
)
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
def gen_ohlc_qpath(
data: np.ndarray,
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.43,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath:
path_was_none = path is None
profiler = pg.debug.Profiler(
msg='gen_qpath ohlc',
disabled=not pg_profile_enabled(),
ms_threshold=ms_slower_then,
)
x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
profiler("generate stream with numba")
# TODO: numba the internals of this!
path = pg.functions.arrayToQPath(
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath")
return path
def ohlc_to_line(
ohlc_shm: ShmArray,
fields: list[str] = ['open', 'high', 'low', 'close']
) -> tuple[
int, # flattened first index
int, # flattened last index
np.ndarray,
np.ndarray,
]:
'''
Convert an input struct-array holding OHLC samples into a pair of
flattened x, y arrays with the same size (datums wise) as the source
data.
'''
y_out = ohlc_shm.ustruct(fields)
first = ohlc_shm._first.value
last = ohlc_shm._last.value
# write pushed data to flattened copy
y_out[first:last] = rfn.structured_to_unstructured(
ohlc_shm.array[fields]
)
# generate an flat-interpolated x-domain
x_out = (
np.broadcast_to(
ohlc_shm._array['index'][:, None],
(
ohlc_shm._array.size,
# 4, # only ohlc
y_out.shape[1],
),
) + np.array([-0.5, 0, 0, 0.5])
)
assert y_out.any()
return (
first,
last,
x_out,
y_out,
)
def to_step_format(
shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[int, np.ndarray, np.ndarray]:
'''
Convert an input 1d shm array to a "step array" format
for use by path graphics generation.
'''
first = shm._first.value
i = shm._array['index'].copy()
out = shm._array[data_field].copy()
x_out = np.broadcast_to(
i[:, None],
(i.size, 2),
) + np.array([-0.5, 0.5])
y_out = np.empty((len(out), 2), dtype=out.dtype)
y_out[:] = out[:, np.newaxis]
# start y at origin level
y_out[0, 0] = 0
return first, x_out, y_out

View File

@ -907,7 +907,9 @@ async def process_trades_and_update_ui(
mode.lines.remove_line(uuid=oid)
# each clearing tick is responded individually
elif resp in ('broker_filled',):
elif resp in (
'broker_filled',
):
known_order = book._sent_orders.get(oid)
if not known_order: