Compare commits

...

73 Commits

Author SHA1 Message Date
Tyler Goodlet 70921fcb8c Add some type annots around pp msg handling 2021-10-29 16:14:45 -04:00
Tyler Goodlet 5ea2273cfb Factor out context cacher to `tractor.trionics` 2021-10-29 16:14:45 -04:00
Tyler Goodlet e4ddc794ad Error out clearing task on first quote being nan 2021-10-29 16:14:45 -04:00
Tyler Goodlet 6ed455d23d Drop throttled rate margin to 100us 2021-10-29 16:14:45 -04:00
Tyler Goodlet 1cb3fedb81 Turn on profiling for the moment 2021-10-29 16:14:45 -04:00
Tyler Goodlet a461139a85 De-densify some funcs 2021-10-29 16:14:45 -04:00
Tyler Goodlet dfc407eb39 Add some typing around web bs 2021-10-29 16:14:45 -04:00
Tyler Goodlet 67a5ff54cb Rename feed bus entrypoint 2021-10-29 16:14:45 -04:00
Tyler Goodlet 9354d0d8e2 Update some typing and add latency checks for binance 2021-10-29 16:14:45 -04:00
Tyler Goodlet 95f4b2aa02 Expect accounts as tuple, don't start rt pnl on no live pp 2021-10-29 16:14:45 -04:00
Tyler Goodlet 80e8112daa Please please please let this dpi scaling hack work 2021-10-29 16:14:45 -04:00
Tyler Goodlet 3a1a906808 Port imports to tractor's new subpkg 2021-10-29 16:14:45 -04:00
Tyler Goodlet 521b995530 Drop old bps from fsp engine 2021-10-29 16:14:45 -04:00
Tyler Goodlet df2dc4d1c5 Repeat the click 3 times 2021-10-29 16:14:45 -04:00
Tyler Goodlet 0470a58e6c Comment on default account load order 2021-10-29 16:14:00 -04:00
Tyler Goodlet 36f1486739 Avoid value error on puterizing unit name 2021-10-29 16:14:00 -04:00
Tyler Goodlet eb75f37b25 Rage drop the limit size unit enum 2021-10-29 16:14:00 -04:00
Tyler Goodlet 6dde26a43f Start testing out trionics helpers, put vlm before rsi 2021-10-29 16:14:00 -04:00
Tyler Goodlet dea0d43ccb Make openGL flag actually work.. 2021-10-29 16:14:00 -04:00
Tyler Goodlet ae5a009c3e Fix exit-slot-edge-case when only one discrete unit remains 2021-10-29 16:14:00 -04:00
Tyler Goodlet 66da98ac5b Fix rsi history off-by-one due to `np.diff()` 2021-10-29 16:14:00 -04:00
Tyler Goodlet 80fa76e8a9 Move sync diffing helpers out of index loop 2021-10-29 16:14:00 -04:00
Tyler Goodlet 9cd63ffc99 Move "desynced" logic into a predicate 2021-10-29 16:14:00 -04:00
Tyler Goodlet b3ed09249a Fix the drunk fix
This should finally be correct fsp src-to-dst array syncing now..
There's a few edge cases but mostly we need to be sure we sync both
back-filled history diffs and avoid current step lag/leads. Use
a polling routine and the more stringent task re-spawn system to get
this right.
2021-10-29 16:14:00 -04:00
Tyler Goodlet 3aeb6e03f1 Sync history recalcs to diff checks via a "task tracker" 2021-10-29 16:14:00 -04:00
wattygetlood a5fc318015 Only scale down for scale < 2 2021-10-29 16:14:00 -04:00
Tyler Goodlet f0f103b60a Revert to old shm "last" meaning last row 2021-10-29 16:14:00 -04:00
Tyler Goodlet efcad0045a Drunkfix: finally solve the fsp alignment race? 2021-10-29 16:14:00 -04:00
Tyler Goodlet 4dd3b8869a Spawn and cache an fsp cluster ahead of time
Use a fixed worker count and don't respawn for every chart, instead
opting for a round-robin to tasks in a cluster and (for now) hoping for
the best in terms of trio scheduling, though we should obviously route
via symbol-locality next. This is currently a boon for chart spawning
startup times since actor creation is done AOT.

Additionally,
- use `zero_on_step` for dollar volume
- drop rsi on startup (again)
- add dollar volume (via fsp) along side unit volume
- litter more profiling to fsp chart startup sequence
- pre-define tick type classes for update loop
2021-10-29 16:14:00 -04:00
Tyler Goodlet 4c806b3187 Start trionics mod with an `async_enter_all` 2021-10-29 16:14:00 -04:00
Tyler Goodlet 42bb8e332c Activate/focus original window after feed reset 2021-10-29 16:14:00 -04:00
Tyler Goodlet fc12e72906 Add zero on increment support 2021-10-29 16:14:00 -04:00
Tyler Goodlet 3ed0739bbe Do fsp sync-to-source in sample step task 2021-10-29 16:14:00 -04:00
Tyler Goodlet 2a723ac994 Expose dollar volume to fsp engine
It can now be declared inside an fsp config dict under the name
`dolla_vlm`. We still need to offer an engine control that zeros
the newest sample value instead of copying from the previous.

This also litters the engine code with `pyqtgraph` profiling to see if
we can improve startup times - likely it'll mean pre-allocating a small
fsp daemon cluster at startup.
2021-10-29 16:14:00 -04:00
Tyler Goodlet 614bb1717b Fix shm index update race
There was a lingering issue where the fsp daemon would sync its shm
array with the source data and we'd set the start/end indices to the
same value. Under some races a reader would then read an empty `.array`
which it wasn't expecting. This fixes that as well as tidies up the
`ShmArray.push()` logic and adds a temporary check in `.array` for zero
length if the array hasn't been written yet.

We can now start removing read array length checks in consumer code
and hopefully no more races will show up.
2021-10-29 16:14:00 -04:00
Tyler Goodlet cd4f0e3276 TOSQUASH fix subplots.values() cuckup 2021-10-29 16:14:00 -04:00
Tyler Goodlet 63e7d1c914 Add first draft of "dollar volume" fsp 2021-10-29 16:14:00 -04:00
Tyler Goodlet 670de076fb Autoscale the y-range for all linked charts 2021-10-29 16:14:00 -04:00
Tyler Goodlet 825820e281 `graphics_name` is more explicit then `name` 2021-10-29 16:14:00 -04:00
Tyler Goodlet 7c0a2a6100 Process framed ticks by type in main graphics loop
We are already packing framed ticks in extended lists from
the `.data._sampling.uniform_rate_send()` task so the natural solution
to avoid needless graphics cycles for HFT-ish feeds (like binance) is
to unpack those frames and for most cases only update graphics with the
"latest" data per loop iteration. Unpacking in this way also lessens
nested-iterations per tick type.

Btw, this also effectively solves all remaining issues of fast tick
feeds over-triggering the graphics loop renders as long as the original
quote stream is throttled appropriately, usually to the local display
rate.

Relates to #183, #192

Dirty deats:
- drop all per-tick rate checks, they were always somewhat pointless
  when iterating a frame of ticks per render cycle XD.
- unpack tick frame into ticks per frame type, and last of each type;
  the lasts are used to update each part of the UI/graphics by class.
- only skip the label update if we can't retrieve the last from from a
  graphics source array; it seems `chart.update_curve_from_array()`
  already does a `len` check internally.
- add some draft commented code for tick type classes and a possible
  wire framed tick data structure.
- move `chart_maxmin()` range computer to module level, bind a chart to
  it with a `partial.`
- only check rate limits in main quote loop thus reporting actual
  overages
- add in commented logic for only updating the "last" cleared price from
  the most recent framed value if we want to eventually (right now seems
  like this is only relevant to ib and it's dark trades: `utrade`).
- rename `_clear_throttle_rate` -> `_quote_throttle_rate`, drop
  `_book_throttle_rate`.
2021-10-29 16:14:00 -04:00
Tyler Goodlet 29d41b36a3 Update fsps and overlays inside main OHLC chart update loop 2021-10-29 16:14:00 -04:00
Tyler Goodlet a36bbdea7a Fix color passthrough, make overlays a `dict` 2021-10-29 16:14:00 -04:00
Tyler Goodlet c67d90eace Factor FSP subplot update code into func
This is in prep toward doing fsp graphics updates from the main quotes
update loop (where OHLC and volume are done). Updating fsp output from
that task should, for the majority of cases, be fine presuming the
processing is derived from the quote stream as a source. Further,
calling an update function on each fsp subplot/overlay is of course
faster then a full task switch - which is how it currently works with
a separate stream for every fsp output. This also will let us delay
adding full `Feed` support around fsp streams for the moment while still
getting quote throttling dictated by the quote stream.

Going forward, We can still support a separate task/fsp stream for
updates as needed (ex. some kind of fast external data source that isn't
synced with price data) but it should be enabled as needed required by
the user.
2021-10-29 16:14:00 -04:00
Tyler Goodlet 6e226de692 Move top level fsp pkg code into an `_engine` module 2021-10-29 16:14:00 -04:00
Tyler Goodlet 6fffa071d2 More prep for FSP feeds
The major change is moving the fsp "daemon" (more like wanna-be fspd)
endpoint to use the newer `tractor.Portal.open_context()` and
bi-directional streaming api.

There's a few other things in here too:
- make a helper for allocating single colume fsp shm arrays
- rename some some fsp related functions to be more explicit on their
  purposes
2021-10-29 16:14:00 -04:00
Tyler Goodlet 429b6f6891 Port fsp daemon to tractor's context api 2021-10-29 16:14:00 -04:00
Tyler Goodlet 154e1f7087 Keep slots ratio of 1 on derivs at startup 2021-10-29 16:14:00 -04:00
Tyler Goodlet e00b98ac3b Force min pnl label width to avoid resizes on magnitude steps 2021-10-29 16:14:00 -04:00
Tyler Goodlet 3538cfd9a0 Shorten edit name, passthrough kwargs to adder methods 2021-10-29 16:14:00 -04:00
Tyler Goodlet be6bc86773 More explicit error on shm push overruns 2021-10-29 16:14:00 -04:00
Tyler Goodlet f973e39093 Update pp size label on settings changes
Resolves #232
2021-10-29 16:14:00 -04:00
Tyler Goodlet aede167996 Make `.paint()` method always the last 2021-10-29 16:13:28 -04:00
Tyler Goodlet 2227759f6e Always draw a last step line with px width=2 2021-10-29 16:13:28 -04:00
Tyler Goodlet 1eb170968d Clean up some imports, shift around some commented code 2021-10-29 16:13:28 -04:00
Tyler Goodlet 6db3afc5c0 Resize volume yaxis to in view range 2021-10-29 16:13:28 -04:00
Tyler Goodlet 342a8fd30c Update vlm sticky 2021-10-29 16:13:28 -04:00
Tyler Goodlet e0f7679128 Pass curve color through to y sticky label 2021-10-29 16:13:28 -04:00
Tyler Goodlet ec980fa353 Re-order grays by "lightness" 2021-10-29 16:13:28 -04:00
Tyler Goodlet a673fa3fee Add back in rsi 2021-10-29 16:13:28 -04:00
Tyler Goodlet 164009ff98 Increase current bar's pen size by a px 2021-10-29 16:13:28 -04:00
Tyler Goodlet db1827f689 Add dynamic subplot sizing logic, passthrouh step curve colors 2021-10-29 16:13:28 -04:00
Tyler Goodlet 3b0fbacefc Use filled rect for current step
A `QRectF` is easier to make and draw (i think?) so use that and fill it
on volume events for decent sleek real-time look. Adjust the step array
generator to allow for an endpoints flag. Comment and/or clean out all
the old path filling calls that gave us perf issues..
2021-10-29 16:13:28 -04:00
Tyler Goodlet b579fbc668 Add test logic for range based volume curve filling 2021-10-29 16:13:28 -04:00
Tyler Goodlet e1e521fdc1 Bleh, try a bunch of stuff for step filling
Turns out the performance of updating and refilling step curves > 1k ish
points is super slow :sadkek:. Disabling the fill basically returns
normal performance, so it seems maybe we'll stick with unfilled volume
"bars" for now. The other tricky bit is getting the path to extend and
fill which is particularly slow if you use the `QPainterPath.united()`
(what `+` set op does) operation which seems to require an entire redraw
of the curve each paint iteration. Removing the pixel buffer cache makes
things that much worse too..

One technique i tried was only setting a `._fill` flag when so many
datums are in view (< 1k as determined by the chart widget), and this
helps, but under high load (trade rates) you still see more lag then
without the fill which makes me say screw it and let's stick with
unfilled bars for now. Trying go to get performant filled curves will be
an exercise for an aspiring graphics eng :P
2021-10-29 16:13:28 -04:00
Tyler Goodlet fc1563dd90 Add last step updates and path fill support 2021-10-29 16:13:28 -04:00
Tyler Goodlet 6a915c75a7 Drop rsi from display by default 2021-10-29 16:13:28 -04:00
Tyler Goodlet 2df240cdfe Add todo for new view padding testing 2021-10-29 16:13:28 -04:00
Tyler Goodlet 553f001757 Add volume plot as default
Toss in support for a "step mode" curve (unfinished atm) and use it to
plot from the `volume` field of the ohlcv shm array (if available).

changes to make it happen,
- dynamically generate the fsp sidepane form from an input config `dict`
  |_ dynamically generate the underlying `pydantic` model
  |_
- add a "volume checker" helper func that inspects the shm array
- toss in sidepane resize calls to avoid race where the ohlcv array
  is plotted too slowly compared to the volume and the chart somehow
  doesn't show..
- drop duplicate rsi2 cruft (previously used to test plots of the shm
  data)
2021-10-29 16:13:28 -04:00
Tyler Goodlet 5d6ec278a3 Invert 'c' (connection) array
In latest `pyqtgraph` it seems there's a discrepancy
since `function.arrayToQPath()` was reworked and now
we need to *not* connect the last point for each bar.
2021-10-29 16:13:28 -04:00
Tyler Goodlet 768384f163 Draft 'step' curve; couldn't get pg builtin to work 2021-10-29 16:13:28 -04:00
Tyler Goodlet 643d2618ec Draft tina install section 2021-10-29 16:13:28 -04:00
Tyler Goodlet ef7d550e94 Toss in references step mode impl 2021-10-29 16:13:28 -04:00
Tyler Goodlet 89bf0b8d21 WIP fsp output throttling - not working yet 2021-10-29 16:12:34 -04:00
31 changed files with 1841 additions and 960 deletions

View File

@ -72,6 +72,34 @@ for a development install::
pip install -r requirements.txt -e .
install for tinas
*****************
for windows peeps you can start by getting `conda installed`_
and the `C++ build toolz`_ on your system.
then, `crack a conda shell`_ and run the following commands::
conda create piker --python=3.9
conda activate piker
conda install pip
pip install --upgrade setuptools
cd dIreCToRieZ\oF\cODez\piker\
pip install -r requirements -e .
in order to look coolio in front of all ur tina friends (and maybe
want to help us with testin, hackzing or configgin), install
`vscode`_ and `setup a coolio tiled wm console`_ so you can start
living the life of the tech literate..
.. _conda installed: https://
.. _C++ build toolz: https://
.. _crack a conda shell: https://
.. _vscode: https://
.. link to the tina guide
.. _setup a coolio tiled wm console: https://
provider support
****************
for live data feeds the in-progress set of supported brokers is:

View File

@ -18,30 +18,18 @@
Cacheing apis and toolz.
"""
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import (
asynccontextmanager,
)
import trio
from trio_typing import TaskStatus
import tractor
from tractor.trionics import maybe_open_context
from .brokers import get_brokermod
from .log import get_logger
T = TypeVar('T')
log = get_logger(__name__)
@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128):
return decorator
_cache: dict[str, 'Client'] = {} # noqa
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.
'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None
@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))
@asynccontextmanager
async def maybe_open_ctx(
key: Hashable,
mngr: AsyncContextManager[T],
) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
'''
await cache.lock.acquire()
ctx_key = id(mngr)
value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value
except KeyError:
log.info(f'Allocating new resource for {key}')
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n
# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()
yield False, value
finally:
cache.users -= 1
if cache.lock.locked():
cache.lock.release()
if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')
# terminate mngr nursery
entry = cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()
@asynccontextmanager
async def open_cached_client(
brokername: str,
@ -190,7 +72,7 @@ async def open_cached_client(
'''
brokermod = get_brokermod(brokername)
async with maybe_open_ctx(
async with maybe_open_context(
key=brokername,
mngr=brokermod.get_client(),
) as (cache_hit, client):

View File

@ -47,7 +47,7 @@ _root_modules = [
class Services(BaseModel):
actor_n: tractor._trionics.ActorNursery
actor_n: tractor._supervise.ActorNursery
service_n: trio.Nursery
debug_mode: bool # tractor sub-actor debug mode flag
service_tasks: dict[str, tuple[trio.CancelScope, tractor.Portal]] = {}

View File

@ -21,7 +21,7 @@ Profiling wrappers for internal libs.
import time
from functools import wraps
_pg_profile: bool = False
_pg_profile: bool = True
def pg_profile_enabled() -> bool:

View File

@ -19,7 +19,7 @@ Binance backend
"""
from contextlib import asynccontextmanager
from typing import List, Dict, Any, Tuple, Union, Optional
from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator
import time
import trio
@ -37,7 +37,7 @@ from .._cacheables import open_cached_client
from ._util import resproc, SymbolNotFound
from ..log import get_logger, get_console_log
from ..data import ShmArray
from ..data._web_bs import open_autorecon_ws
from ..data._web_bs import open_autorecon_ws, NoBsWs
log = get_logger(__name__)
@ -213,7 +213,7 @@ class Client:
)
# repack in dict form
return {item[0]['symbol']: item[0]
for item in matches}
for item in matches}
async def bars(
self,
@ -295,7 +295,7 @@ class AggTrade(BaseModel):
M: bool # Ignore
async def stream_messages(ws):
async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]:
timeouts = 0
while True:
@ -487,11 +487,20 @@ async def stream_quotes(
# signal to caller feed is ready for consumption
feed_is_live.set()
# import time
# last = time.time()
# start streaming
async for typ, msg in msg_gen:
# period = time.time() - last
# hz = 1/period if period else float('inf')
# if hz > 60:
# log.info(f'Binance quotez : {hz}')
topic = msg['symbol'].lower()
await send_chan.send({topic: msg})
# last = time.time()
@tractor.context

View File

@ -87,13 +87,21 @@ class Allocator(BaseModel):
symbol: Symbol
account: Optional[str] = 'paper'
size_unit: SizeUnit = 'currency'
# TODO: for enums this clearly doesn't fucking work, you can't set
# a default at startup by passing in a `dict` but yet you can set
# that value through assignment..for wtv cucked reason.. honestly, pure
# unintuitive garbage.
size_unit: str = 'currency'
_size_units: dict[str, Optional[str]] = _size_units
@validator('size_unit')
def lookup_key(cls, v):
@validator('size_unit', pre=True)
def maybe_lookup_key(cls, v):
# apply the corresponding enum key for the text "description" value
return v.name
if v not in _size_units:
return _size_units.inverse[v]
assert v in _size_units
return v
# TODO: if we ever want ot support non-uniform entry-slot-proportion
# "sizes"
@ -157,6 +165,9 @@ class Allocator(BaseModel):
slot_size = currency_per_slot / price
l_sub_pp = (self.currency_limit - live_cost_basis) / price
else:
raise ValueError(f"Not valid size unit '{size}'")
# an entry (adding-to or starting a pp)
if (
action == 'buy' and live_size > 0 or
@ -204,7 +215,14 @@ class Allocator(BaseModel):
# **without** going past a net-zero pp. if the pp is
# > 1.5x a slot size, then front load: exit a slot's and
# expect net-zero to be acquired on the final exit.
slot_size < pp_size < round((1.5*slot_size), ndigits=ld)
slot_size < pp_size < round((1.5*slot_size), ndigits=ld) or
# underlying requires discrete (int) units (eg. stocks)
# and thus our slot size (based on our limit) would
# exit a fractional unit's worth so, presuming we aren't
# supporting a fractional-units-style broker, we need
# exit the final unit.
ld == 0 and abs_live_size == 1
):
order_size = abs_live_size
@ -259,7 +277,7 @@ def mk_allocator(
# default allocation settings
defaults: dict[str, float] = {
'account': None, # select paper by default
'size_unit': _size_units['currency'],
'size_unit': 'currency', #_size_units['currency'],
'units_limit': 400,
'currency_limit': 5e3,
'slots': 4,
@ -274,8 +292,8 @@ def mk_allocator(
# load and retreive user settings for default allocations
# ``config.toml``
user_def = {
'currency_limit': 5e3,
'slots': 4,
'currency_limit': 6e3,
'slots': 6,
}
defaults.update(user_def)
@ -287,6 +305,7 @@ def mk_allocator(
asset_type = symbol.type_key
# specific configs by asset class / type
if asset_type in ('future', 'option', 'futures_option'):
@ -308,9 +327,12 @@ def mk_allocator(
alloc.currency_limit = round(startup_size, ndigits=2)
else:
startup_size = startup_pp.size
startup_size = abs(startup_pp.size)
if startup_size > alloc.units_limit:
alloc.units_limit = startup_size
if asset_type in ('future', 'option', 'futures_option'):
alloc.slots = alloc.units_limit
return alloc

View File

@ -25,7 +25,7 @@ from dataclasses import dataclass, field
import trio
import tractor
from tractor._broadcast import broadcast_receiver
from tractor.trionics import broadcast_receiver
from ..data._source import Symbol
from ..log import get_logger

View File

@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems"
"""
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from math import isnan
from pprint import pformat
import time
from typing import AsyncIterator, Callable
@ -47,9 +48,11 @@ log = get_logger(__name__)
# TODO: numba all of this
def mk_check(
trigger_price: float,
known_last: float,
action: str,
) -> Callable[[float, float], bool]:
"""Create a predicate for given ``exec_price`` based on last known
price, ``known_last``.
@ -77,8 +80,7 @@ def mk_check(
return check_lt
else:
return None
raise ValueError('trigger: {trigger_price}, last: {known_last}')
@dataclass
@ -177,7 +179,15 @@ async def clear_dark_triggers(
tuple(execs.items())
):
if not pred or (ttype not in tf) or (not pred(price)):
if (
not pred or
ttype not in tf or
not pred(price)
):
log.debug(
f'skipping quote for {sym} '
f'{pred}, {ttype} not in {tf}?, {pred(price)}'
)
# majority of iterations will be non-matches
continue
@ -269,7 +279,7 @@ class TradesRelay:
positions: dict[str, dict[str, BrokerdPosition]]
# allowed account names
accounts: set[str]
accounts: tuple[str]
# count of connected ems clients for this ``brokerd``
consumers: int = 0
@ -414,6 +424,9 @@ async def open_brokerd_trades_dialogue(
)
try:
positions: list[BrokerdPosition]
accounts: tuple[str]
async with (
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
brokerd_ctx.open_stream() as brokerd_trades_stream,
@ -449,7 +462,7 @@ async def open_brokerd_trades_dialogue(
relay = TradesRelay(
brokerd_dialogue=brokerd_trades_stream,
positions=pps,
accounts=set(accounts),
accounts=accounts,
consumers=1,
)
@ -1002,7 +1015,8 @@ async def _emsd_main(
first_quote = feed.first_quotes[symbol]
book = _router.get_dark_book(broker)
book.lasts[(broker, symbol)] = first_quote['last']
last = book.lasts[(broker, symbol)] = first_quote['last']
assert not isnan(last) # ib is a cucker but we've fixed it in the backend
# open a stream with the brokerd backend for order
# flow dialogue

View File

@ -172,7 +172,6 @@ async def sample_and_broadcast(
# iterate stream delivered by broker
async for quotes in quote_stream:
# TODO: ``numba`` this!
for sym, quote in quotes.items():
@ -185,8 +184,12 @@ async def sample_and_broadcast(
# start writing the shm buffer with appropriate
# trade data
for tick in quote['ticks']:
# TODO: we should probably not write every single
# value to an OHLC sample stream XD
# for a tick stream sure.. but this is excessive..
ticks = quote['ticks']
for tick in ticks:
ticktype = tick['type']
# write trade events to shm last OHLC sample
@ -246,7 +249,7 @@ async def sample_and_broadcast(
if tick_throttle:
# this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below.
await stream.send(quote)
await stream.send((sym, quote))
else:
await stream.send({sym: quote})
@ -258,7 +261,8 @@ async def sample_and_broadcast(
except (
trio.BrokenResourceError,
trio.ClosedResourceError
trio.ClosedResourceError,
trio.EndOfChannel,
):
# XXX: do we need to deregister here
# if it's done in the fee bus code?
@ -268,6 +272,10 @@ async def sample_and_broadcast(
f'{stream._ctx.chan.uid} dropped '
'`brokerd`-quotes-feed connection'
)
if tick_throttle:
assert stream.closed()
# await stream.aclose()
subs.remove((stream, tick_throttle))
@ -283,12 +291,12 @@ async def uniform_rate_send(
) -> None:
sleep_period = 1/rate - 0.000616
sleep_period = 1/rate - 0.0001 # 100us
last_send = time.time()
while True:
first_quote = await quote_stream.receive()
sym, first_quote = await quote_stream.receive()
start = time.time()
# append quotes since last iteration into the last quote's
@ -301,23 +309,36 @@ async def uniform_rate_send(
#
while True:
try:
next_quote = quote_stream.receive_nowait()
sym, next_quote = quote_stream.receive_nowait()
ticks = next_quote.get('ticks')
# XXX: idea for frame type data structure we could use on the
# wire instead of a simple list?
# frames = {
# 'index': ['type_a', 'type_c', 'type_n', 'type_n'],
# 'type_a': [tick0, tick1, tick2, .., tickn],
# 'type_b': [tick0, tick1, tick2, .., tickn],
# 'type_c': [tick0, tick1, tick2, .., tickn],
# ...
# 'type_n': [tick0, tick1, tick2, .., tickn],
# }
if ticks:
first_quote['ticks'].extend(ticks)
except trio.WouldBlock:
now = time.time()
rate = 1 / (now - last_send)
last_send = now
# print(f'{rate} Hz sending quotes') # \n{first_quote}')
log.debug(
f'`{sym}` throttled send hz: {round(rate, ndigits=1)}'
)
# TODO: now if only we could sync this to the display
# rate timing exactly lul
try:
await stream.send({first_quote['symbol']: first_quote})
await stream.send({sym: first_quote})
last_send = now
break
except trio.ClosedResourceError:
# if the feed consumer goes down then drop

View File

@ -31,7 +31,7 @@ import tractor
import numpy as np
from ..log import get_logger
from ._source import base_ohlc_dtype, base_iohlc_dtype
from ._source import base_iohlc_dtype
log = get_logger(__name__)
@ -168,6 +168,7 @@ class ShmArray:
self._len = len(shmarr)
self._shm = shm
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
@ -196,7 +197,24 @@ class ShmArray:
@property
def array(self) -> np.ndarray:
return self._array[self._first.value:self._last.value]
'''Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer.
'''
a = self._array[self._first.value:self._last.value]
# first, last = self._first.value, self._last.value
# a = self._array[first:last]
# TODO: eventually comment this once we've not seen it in the
# wild in a long time..
# XXX: race where first/last indexes cause a reader
# to load an empty array..
if len(a) == 0 and self._post_init:
raise RuntimeError('Empty array race condition hit!?')
# breakpoint()
return a
def last(
self,
@ -209,6 +227,7 @@ class ShmArray:
data: np.ndarray,
prepend: bool = False,
start: Optional[int] = None,
) -> int:
'''Ring buffer like "push" to append data
@ -217,12 +236,18 @@ class ShmArray:
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
self._post_init = True
length = len(data)
index = start or self._last.value
if prepend:
index = self._first.value - length
else:
index = self._last.value
if index < 0:
raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n'
'You have passed {abs(index)} too many datums.'
)
end = index + length
@ -230,11 +255,22 @@ class ShmArray:
try:
self._array[fields][index:end] = data[fields][:]
# NOTE: there was a race here between updating
# the first and last indices and when the next reader
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if prepend:
assert index < self._first.value
if index < self._first.value:
self._first.value = index
else:
self._last.value = end
return end
except ValueError as err:
# shoudl raise if diff detected
self.diff_err_fields(data)
@ -290,20 +326,25 @@ class ShmArray:
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 12)
_default_size = 2 * _secs_in_day
_secs_in_day = int(60 * 60 * 24)
# we try for 3 times but only on a run-every-other-day kinda week.
_default_size = 3 * _secs_in_day
def open_shm_array(
key: Optional[str] = None,
size: int = _default_size,
dtype: Optional[np.dtype] = None,
readonly: bool = False,
) -> ShmArray:
"""Open a memory shared ``numpy`` using the standard library.
'''Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process).
"""
'''
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
@ -354,6 +395,7 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
tractor._actor._lifetime_stack.callback(shmarr.close)
tractor._actor._lifetime_stack.callback(shmarr.destroy)

View File

@ -133,9 +133,11 @@ def mk_symbol(
def from_df(
df: pd.DataFrame,
source=None,
default_tf=None
) -> np.recarray:
"""Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``.

View File

@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols.
"""
from contextlib import asynccontextmanager, AsyncExitStack
from types import ModuleType
from typing import Any, Callable
from typing import Any, Callable, AsyncGenerator
import json
import trio
@ -127,7 +127,7 @@ async def open_autorecon_ws(
# TODO: proper type annot smh
fixture: Callable,
):
) -> AsyncGenerator[tuple[...], NoBsWs]:
"""Apparently we can QoS for all sorts of reasons..so catch em.
"""

View File

@ -34,11 +34,10 @@ import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
# from tractor import _broadcast
from pydantic import BaseModel
from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx
from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log
from .._daemon import (
maybe_spawn_brokerd,
@ -247,7 +246,7 @@ async def allocate_persistent_feed(
@tractor.context
async def attach_feed_bus(
async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
@ -364,7 +363,7 @@ async def open_sample_step_stream(
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_ctx(
async with maybe_open_context(
key=delay_s,
mngr=portal.open_stream_from(
iter_ohlc_periods,
@ -507,7 +506,7 @@ async def open_feed(
portal.open_context(
attach_feed_bus,
open_feed_bus,
brokername=brokername,
symbol=sym,
loglevel=loglevel,
@ -586,7 +585,7 @@ async def maybe_open_feed(
'''
sym = symbols[0].lower()
async with maybe_open_ctx(
async with maybe_open_context(
key=(brokername, sym),
mngr=open_feed(
brokername,

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# Copyright (C) Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -14,33 +14,17 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Financial signal processing for the peeps.
"""
from functools import partial
from typing import AsyncIterator, Callable, Tuple, Optional
'''
Fin-sig-proc for the peeps!
'''
from typing import AsyncIterator
import trio
from trio_typing import TaskStatus
import tractor
import numpy as np
from ..log import get_logger, get_console_log
from .. import data
from ._momo import _rsi, _wma
from ._volume import _tina_vwap
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
from ._engine import cascade
log = get_logger(__name__)
_fsps = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
}
__all__ = ['cascade']
async def latency(
@ -63,183 +47,3 @@ async def latency(
# stack tracing.
value = quote['brokerd_ts'] - quote['broker_ts']
yield value
async def fsp_compute(
ctx: tractor.Context,
symbol: str,
feed: Feed,
stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
fsp_func_name: str,
func: Callable,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
# TODO: load appropriate fsp with input args
async def filter_by_sym(
sym: str,
stream,
):
# TODO: make this the actualy first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
# task cancellation won't kill the channel
# since we shielded at the `open_feed()` call
async for quotes in stream:
for symbol, quotes in quotes.items():
if symbol == sym:
yield quotes
out_stream = func(
filter_by_sym(symbol, stream),
feed.shm,
)
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
dst._first.value = src._first.value
dst._last.value = src._first.value
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[fsp_func_name] = history_output
# check for data length mis-allignment and fill missing values
diff = len(src.array) - len(history)
if diff >= 0:
log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}")
for _ in range(diff):
dst.push(history[:1])
# compare with source signal and time align
index = dst.push(history)
await ctx.send_yield(index)
# setup a respawn handle
with trio.CancelScope() as cs:
task_status.started(cs)
# rt stream
async for processed in out_stream:
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
log.debug(f"{fsp_func_name}: {processed}")
index = src.index
dst.array[-1][fsp_func_name] = processed
# stream latest shm array index entry
await ctx.send_yield(index)
@tractor.stream
async def cascade(
ctx: tractor.Context,
brokername: str,
src_shm_token: dict,
dst_shm_token: Tuple[str, np.dtype],
symbol: str,
fsp_func_name: str,
loglevel: Optional[str] = None,
) -> None:
'''Chain streaming signal processors and deliver output to
destination mem buf.
'''
if loglevel:
get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)
func: Callable = _fsps[fsp_func_name]
# open a data feed stream with requested broker
async with data.feed.maybe_open_feed(
brokername,
[symbol],
# TODO:
# tick_throttle=60,
) as (feed, stream):
assert src.token == feed.shm.token
last_len = new_len = len(src.array)
fsp_target = partial(
fsp_compute,
ctx=ctx,
symbol=symbol,
feed=feed,
stream=stream,
src=src,
dst=dst,
fsp_func_name=fsp_func_name,
func=func
)
async with trio.open_nursery() as n:
cs = await n.start(fsp_target)
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream:
async for msg in stream:
new_len = len(src.array)
if new_len > last_len + 1:
# respawn the signal compute task if the source
# signal has been updated
cs.cancel()
cs = await n.start(fsp_target)
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
# read out last shm row
array = dst.array
last = array[-1:].copy()
# write new row to the shm buffer
dst.push(last)
last_len = new_len

View File

@ -0,0 +1,343 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
core task logic for processing chains
'''
from dataclasses import dataclass
from functools import partial
from typing import AsyncIterator, Callable, Optional
import numpy as np
import pyqtgraph as pg
import trio
from trio_typing import TaskStatus
import tractor
from ..log import get_logger, get_console_log
from .. import data
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
from ._momo import _rsi, _wma
from ._volume import _tina_vwap, dolla_vlm
log = get_logger(__name__)
_fsp_builtins = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
'dolla_vlm': dolla_vlm,
}
# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)
@dataclass
class TaskTracker:
complete: trio.Event
cs: trio.CancelScope
async def filter_quotes_by_sym(
sym: str,
quote_stream: tractor.MsgStream,
) -> AsyncIterator[dict]:
'''
Filter quote stream by target symbol.
'''
# TODO: make this the actual first quote from feed
# XXX: this allows for a single iteration to run for history
# processing without waiting on the real-time feed for a new quote
yield {}
async for quotes in quote_stream:
quote = quotes.get(sym)
if quote:
yield quote
async def fsp_compute(
stream: tractor.MsgStream,
symbol: str,
feed: Feed,
quote_stream: trio.abc.ReceiveChannel,
src: ShmArray,
dst: ShmArray,
func_name: str,
func: Callable,
attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
profiler = pg.debug.Profiler(
delayed=False,
disabled=True
)
out_stream = func(
# TODO: do we even need this if we do the feed api right?
# shouldn't a local stream do this before we get a handle
# to the async iterable? it's that or we do some kinda
# async itertools style?
filter_quotes_by_sym(symbol, quote_stream),
feed.shm,
)
# Conduct a single iteration of fsp with historical bars input
# and get historical output
history_output = await out_stream.__anext__()
profiler(f'{func_name} generated history')
# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
dtype=dst.array.dtype
)
history[func_name] = history_output
# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
# prepending a copy of the first value a few times to make
# sub-curves align with the parent bar chart.
# This likely needs to be fixed either by,
# - manually assigning the index and historical data
# seperately to the shm array (i.e. not using .push())
# - developing some system on top of the shared mem array that
# is `index` aware such that historical data can be indexed
# relative to the true first datum? Not sure if this is sane
# for incremental compuations.
first = dst._first.value = src._first.value
# TODO: can we use this `start` flag instead of the manual
# setting above?
index = dst.push(history, start=first)
profiler(f'{func_name} pushed history')
profiler.finish()
# setup a respawn handle
with trio.CancelScope() as cs:
tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index')
# import time
# last = time.time()
try:
# rt stream
async for processed in out_stream:
log.debug(f"{func_name}: {processed}")
index = src.index
dst.array[-1][func_name] = processed
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm
if attach_stream:
await stream.send(index)
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
finally:
tracker.complete.set()
@tractor.context
async def cascade(
ctx: tractor.Context,
brokername: str,
src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype],
symbol: str,
func_name: str,
zero_on_step: bool = False,
loglevel: Optional[str] = None,
) -> None:
'''
Chain streaming signal processors and deliver output to
destination shm array buffer.
'''
profiler = pg.debug.Profiler(delayed=False, disabled=False)
if loglevel:
get_console_log(loglevel)
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)
func: Callable = _fsp_builtins.get(func_name)
if not func:
# TODO: assume it's a func target path
raise ValueError('Unknown fsp target: {func_name}')
# open a data feed stream with requested broker
async with data.feed.maybe_open_feed(
brokername,
[symbol],
# TODO throttle tick outputs from *this* daemon since
# it'll emit tons of ticks due to the throttle only
# limits quote arrival periods, so the consumer of *this*
# needs to get throttled the ticks we generate.
# tick_throttle=60,
) as (feed, quote_stream):
profiler(f'{func_name}: feed up')
assert src.token == feed.shm.token
# last_len = new_len = len(src.array)
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
):
fsp_target = partial(
fsp_compute,
stream=stream,
symbol=symbol,
feed=feed,
quote_stream=quote_stream,
# shm
src=src,
dst=dst,
func_name=func_name,
func=func
)
tracker, index = await n.start(fsp_target)
if zero_on_step:
last = dst.array[-1:]
zeroed = np.zeros(last.shape, dtype=last.dtype)
await ctx.started(index)
profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
def is_synced(
src: ShmArray,
dst: ShmArray
) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP
output array is aligned to its source array.
'''
step_diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
return not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be
# leading/lagging by a step
step_diff > 1 or
step_diff < 0
), step_diff, len_diff
async def poll_and_sync_to_step(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff
s, step, ld = is_synced(src, dst)
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream() as stream:
profiler(f'{func_name}: sample stream up')
profiler.finish()
async for msg in stream:
# respawn the compute task if the source
# array has been updated such that we compute
# new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst)
if not synced:
tracker, step_diff = await poll_and_sync_to_step(
tracker,
src,
dst,
)
# skip adding a last bar since we should already
# be step alinged
if step_diff == 0:
continue
# read out last shm row, copy and write new row
array = dst.array
# some metrics like vlm should be reset
# to zero every step.
if zero_on_step:
last = zeroed
else:
last = array[-1:].copy()
dst.push(last)

View File

@ -16,6 +16,7 @@
"""
Momentum bby.
"""
from typing import AsyncIterator, Optional
@ -23,12 +24,9 @@ import numpy as np
from numba import jit, float64, optional, int64
from ..data._normalize import iterticks
from ..data._sharedmem import ShmArray
# TODO: things to figure the fuck out:
# - how to handle non-plottable values
# - composition of fsps / implicit chaining
@jit(
float64[:](
float64[:],
@ -39,11 +37,14 @@ from ..data._normalize import iterticks
nogil=True
)
def ema(
y: 'np.ndarray[float64]',
alpha: optional(float64) = None,
ylast: optional(float64) = None,
) -> 'np.ndarray[float64]':
r"""Exponential weighted moving average owka 'Exponential smoothing'.
r'''
Exponential weighted moving average owka 'Exponential smoothing'.
- https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
- https://en.wikipedia.org/wiki/Exponential_smoothing
@ -68,7 +69,8 @@ def ema(
More discussion here:
https://stackoverflow.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
"""
'''
n = y.shape[0]
if alpha is None:
@ -105,14 +107,21 @@ def ema(
# nogil=True
# )
def rsi(
# TODO: use https://github.com/ramonhagenaars/nptyping
signal: 'np.ndarray[float64]',
period: int64 = 14,
up_ema_last: float64 = None,
down_ema_last: float64 = None,
) -> 'np.ndarray[float64]':
'''
relative strengggth.
'''
alpha = 1/period
df = np.diff(signal)
df = np.diff(signal, prepend=0)
up = np.where(df > 0, df, 0)
up_ema = ema(up, alpha, up_ema_last)
@ -120,11 +129,12 @@ def rsi(
down = np.where(df < 0, -df, 0)
down_ema = ema(down, alpha, down_ema_last)
# avoid dbz errors
# avoid dbz errors, this leaves the first
# index == 0 right?
rs = np.divide(
up_ema,
down_ema,
out=np.zeros_like(up_ema),
out=np.zeros_like(signal),
where=down_ema != 0
)
@ -137,10 +147,18 @@ def rsi(
def wma(
signal: np.ndarray,
length: int,
weights: Optional[np.ndarray] = None,
) -> np.ndarray:
'''
Compute a windowed moving average of ``signal`` with window
``length`` and optional ``weights`` (must be same size as
``signal``).
'''
if weights is None:
# default is a standard arithmetic mean
seq = np.full((length,), 1)
@ -151,18 +169,22 @@ def wma(
return np.convolve(signal, weights, 'valid')
# @piker.fsp.signal(
# @piker.fsp.emit(
# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'],
# )
async def _rsi(
source: 'QuoteStream[Dict[str, Any]]', # noqa
ohlcv: "ShmArray[T<'close'>]",
ohlcv: ShmArray,
period: int = 14,
) -> AsyncIterator[np.ndarray]:
"""Multi-timeframe streaming RSI.
'''
Multi-timeframe streaming RSI.
https://en.wikipedia.org/wiki/Relative_strength_index
"""
'''
sig = ohlcv.array['close']
# wilder says to seed the RSI EMAs with the SMA for the "period"
@ -170,7 +192,8 @@ async def _rsi(
# TODO: the emas here should be seeded with a period SMA as per
# wilder's original formula..
rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed)
rsi_h, last_up_ema_close, last_down_ema_close = rsi(
sig, period, seed, seed)
up_ema_last = last_up_ema_close
down_ema_last = last_down_ema_close
@ -178,7 +201,6 @@ async def _rsi(
yield rsi_h
index = ohlcv.index
async for quote in source:
# tick based updates
for tick in iterticks(quote):
@ -206,16 +228,20 @@ async def _rsi(
async def _wma(
source, #: AsyncStream[np.ndarray],
length: int,
ohlcv: np.ndarray, # price time-frame "aware"
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming weighted moving average.
'''
Streaming weighted moving average.
``weights`` is a sequence of already scaled values. As an example
for the WMA often found in "techincal analysis":
``weights = np.arange(1, N) * N*(N-1)/2``.
"""
'''
# deliver historical output as "first yield"
yield wma(ohlcv.array['close'], length)

View File

@ -14,16 +14,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from typing import AsyncIterator, Optional
from typing import AsyncIterator, Optional, Union
import numpy as np
from tractor.trionics._broadcast import AsyncReceiver
from ..data._normalize import iterticks
from ..data._sharedmem import ShmArray
def wap(
signal: np.ndarray,
weights: np.ndarray,
) -> np.ndarray:
"""Weighted average price from signal and weights.
@ -47,15 +51,22 @@ def wap(
async def _tina_vwap(
source, #: AsyncStream[np.ndarray],
ohlcv: np.ndarray, # price time-frame "aware"
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
# TODO: anchor logic (eg. to session start)
anchors: Optional[np.ndarray] = None,
) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream?
"""Streaming volume weighted moving average.
) -> Union[
AsyncIterator[np.ndarray],
float
]:
'''Streaming volume weighted moving average.
Calling this "tina" for now since we're using HLC3 instead of tick.
"""
'''
if anchors is None:
# TODO:
# anchor to session start of data if possible
@ -75,7 +86,6 @@ async def _tina_vwap(
# vwap_tot = h_vwap[-1]
async for quote in source:
for tick in iterticks(quote, types=['trade']):
# c, h, l, v = ohlcv.array[-1][
@ -91,3 +101,44 @@ async def _tina_vwap(
# yield ((((o + h + l) / 3) * v) weights_tot) / v_tot
yield w_tot / v_tot
async def dolla_vlm(
source: AsyncReceiver[dict],
ohlcv: ShmArray, # OHLC sampled history
) -> Union[
AsyncIterator[np.ndarray],
float
]:
a = ohlcv.array
chl3 = (a['close'] + a['high'] + a['low']) / 3
v = a['volume']
# history
yield chl3 * v
i = ohlcv.index
lvlm = 0
async for quote in source:
for tick in iterticks(quote):
# this computes tick-by-tick weightings from here forward
size = tick['size']
price = tick['price']
li = ohlcv.index
if li > i:
i = li
lvlm = 0
c, h, l, v = ohlcv.last()[
['close', 'high', 'low', 'volume']
][0]
lvlm += price * size
tina_lvlm = c+h+l/3 * v
# print(f' tinal vlm: {tina_lvlm}')
yield lvlm

80
piker/trionics.py 100644
View File

@ -0,0 +1,80 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
sugarz for trio/tractor conc peeps.
'''
from typing import AsyncContextManager
from typing import TypeVar
from contextlib import asynccontextmanager as acm
import trio
# A regular invariant generic type
T = TypeVar("T")
async def _enter_and_sleep(
mngr: AsyncContextManager[T],
to_yield: dict[int, T],
all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> T:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
to_yield[id(mngr)] = value
if all(to_yield.values()):
all_entered.set()
# sleep until cancelled
await trio.sleep_forever()
@acm
async def async_enter_all(
*mngrs: list[AsyncContextManager[T]],
) -> tuple[T]:
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event()
async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_sleep,
mngr,
to_yield,
all_entered,
)
# deliver control once all managers have started up
await all_entered.wait()
yield tuple(to_yield.values())
# tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel()

View File

@ -85,11 +85,11 @@ async def _async_main(
screen = godwidget.window.current_screen()
# configure graphics update throttling based on display refresh rate
_display._clear_throttle_rate = min(
_display._quote_throttle_rate = min(
round(screen.refreshRate()),
_display._clear_throttle_rate,
_display._quote_throttle_rate,
)
log.info(f'Set graphics update rate to {_display._clear_throttle_rate} Hz')
log.info(f'Set graphics update rate to {_display._quote_throttle_rate} Hz')
# TODO: do styling / themeing setup
# _style.style_ze_sheets(godwidget)

View File

@ -25,6 +25,9 @@ from PyQt5.QtCore import Qt
from PyQt5.QtWidgets import (
QFrame,
QWidget,
QHBoxLayout,
QVBoxLayout,
QSplitter,
# QSizePolicy,
)
import numpy as np
@ -53,6 +56,7 @@ from ._style import (
)
from ..data.feed import Feed
from ..data._source import Symbol
from ..data._sharedmem import ShmArray
from ..log import get_logger
from ._interaction import ChartView
from ._forms import FieldsForm
@ -64,11 +68,11 @@ log = get_logger(__name__)
class GodWidget(QWidget):
'''
"Our lord and savior, the holy child of window-shua, there is no
widget above thee." - 6|6
widget above thee." - 6||6
The highest level composed widget which contains layouts for
organizing lower level charts as well as other widgets used to
control or modify them.
organizing charts as well as other sub-widgets used to control or
modify them.
'''
def __init__(
@ -80,19 +84,19 @@ class GodWidget(QWidget):
super().__init__(parent)
self.hbox = QtWidgets.QHBoxLayout(self)
self.hbox = QHBoxLayout(self)
self.hbox.setContentsMargins(0, 0, 0, 0)
self.hbox.setSpacing(6)
self.hbox.setAlignment(Qt.AlignTop)
self.vbox = QtWidgets.QVBoxLayout()
self.vbox = QVBoxLayout()
self.vbox.setContentsMargins(0, 0, 0, 0)
self.vbox.setSpacing(2)
self.vbox.setAlignment(Qt.AlignTop)
self.hbox.addLayout(self.vbox)
# self.toolbar_layout = QtWidgets.QHBoxLayout()
# self.toolbar_layout = QHBoxLayout()
# self.toolbar_layout.setContentsMargins(0, 0, 0, 0)
# self.vbox.addLayout(self.toolbar_layout)
@ -106,25 +110,8 @@ class GodWidget(QWidget):
# assigned in the startup func `_async_main()`
self._root_n: trio.Nursery = None
def set_chart_symbol(
self,
symbol_key: str, # of form <fqsn>.<providername>
linkedsplits: 'LinkedSplits', # type: ignore
) -> None:
# re-sort org cache symbol list in LIFO order
cache = self._chart_cache
cache.pop(symbol_key, None)
cache[symbol_key] = linkedsplits
def get_chart_symbol(
self,
symbol_key: str,
) -> 'LinkedSplits': # type: ignore
return self._chart_cache.get(symbol_key)
# def init_timeframes_ui(self):
# self.tf_layout = QtWidgets.QHBoxLayout()
# self.tf_layout = QHBoxLayout()
# self.tf_layout.setSpacing(0)
# self.tf_layout.setContentsMargins(0, 12, 0, 0)
# time_frames = ('1M', '5M', '15M', '30M', '1H', '1D', '1W', 'MN')
@ -145,6 +132,23 @@ class GodWidget(QWidget):
# self.strategy_box = StrategyBoxWidget(self)
# self.toolbar_layout.addWidget(self.strategy_box)
def set_chart_symbol(
self,
symbol_key: str, # of form <fqsn>.<providername>
linkedsplits: 'LinkedSplits', # type: ignore
) -> None:
# re-sort org cache symbol list in LIFO order
cache = self._chart_cache
cache.pop(symbol_key, None)
cache[symbol_key] = linkedsplits
def get_chart_symbol(
self,
symbol_key: str,
) -> 'LinkedSplits': # type: ignore
return self._chart_cache.get(symbol_key)
async def load_symbol(
self,
@ -255,7 +259,7 @@ class ChartnPane(QFrame):
'''
sidepane: FieldsForm
hbox: QtWidgets.QHBoxLayout
hbox: QHBoxLayout
chart: Optional['ChartPlotWidget'] = None
def __init__(
@ -271,7 +275,7 @@ class ChartnPane(QFrame):
self.sidepane = sidepane
self.chart = None
hbox = self.hbox = QtWidgets.QHBoxLayout(self)
hbox = self.hbox = QHBoxLayout(self)
hbox.setAlignment(Qt.AlignTop | Qt.AlignLeft)
hbox.setContentsMargins(0, 0, 0, 0)
hbox.setSpacing(3)
@ -281,21 +285,14 @@ class ChartnPane(QFrame):
class LinkedSplits(QWidget):
'''
Widget that holds a central chart plus derived
subcharts computed from the original data set apart
by splitters for resizing.
Composite that holds a central chart plus a set of (derived)
subcharts (usually computed from the original data) arranged in
a splitter for resizing.
A single internal references to the data is maintained
for each chart and can be updated externally.
'''
long_pen = pg.mkPen('#006000')
long_brush = pg.mkBrush('#00ff00')
short_pen = pg.mkPen('#600000')
short_brush = pg.mkBrush('#ff0000')
zoomIsDisabled = QtCore.pyqtSignal(bool)
def __init__(
self,
@ -325,11 +322,11 @@ class LinkedSplits(QWidget):
# self.xaxis_ind.setStyle(showValues=False)
# self.xaxis.hide()
self.splitter = QtWidgets.QSplitter(QtCore.Qt.Vertical)
self.splitter.setMidLineWidth(1)
self.splitter.setHandleWidth(0)
self.splitter = QSplitter(QtCore.Qt.Vertical)
self.splitter.setMidLineWidth(0)
self.splitter.setHandleWidth(2)
self.layout = QtWidgets.QVBoxLayout(self)
self.layout = QVBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.layout.addWidget(self.splitter)
@ -341,20 +338,28 @@ class LinkedSplits(QWidget):
def set_split_sizes(
self,
# prop: float = 0.375, # proportion allocated to consumer subcharts
prop: float = 5/8,
prop: Optional[float] = None,
) -> None:
'''Set the proportion of space allocated for linked subcharts.
'''
ln = len(self.subplots)
if not prop:
# proportion allocated to consumer subcharts
if ln < 2:
prop = 1/(.666 * 6)
elif ln >= 2:
prop = 3/8
major = 1 - prop
min_h_ind = int((self.height() * prop) / len(self.subplots))
min_h_ind = int((self.height() * prop) / ln)
sizes = [int(self.height() * major)]
sizes.extend([min_h_ind] * len(self.subplots))
sizes.extend([min_h_ind] * ln)
self.splitter.setSizes(sizes) # , int(self.height()*0.2)
self.splitter.setSizes(sizes)
def focus(self) -> None:
if self.chart is not None:
@ -374,16 +379,21 @@ class LinkedSplits(QWidget):
style: str = 'bar',
) -> 'ChartPlotWidget':
"""Start up and show main (price) chart and all linked subcharts.
'''Start up and show main (price) chart and all linked subcharts.
The data input struct array must include OHLC fields.
"""
'''
# add crosshairs
self.cursor = Cursor(
linkedsplits=self,
digits=symbol.tick_size_digits,
)
# NOTE: atm the first (and only) OHLC price chart for the symbol
# is given a special reference but in the future there shouldn't
# be no distinction since we will have multiple symbols per
# view as part of "aggregate feeds".
self.chart = self.add_plot(
name=symbol.key,
@ -425,9 +435,7 @@ class LinkedSplits(QWidget):
**cpw_kwargs,
) -> 'ChartPlotWidget':
'''Add (sub)plots to chart widget by name.
If ``name`` == ``"main"`` the chart will be the the primary view.
'''Add (sub)plots to chart widget by key.
'''
if self.chart is None and not _is_main:
@ -495,8 +503,9 @@ class LinkedSplits(QWidget):
cpw.plotItem.vb.linkedsplits = self
cpw.setFrameStyle(
QtWidgets.QFrame.StyledPanel
# | QtWidgets.QFrame.Plain)
# | QtWidgets.QFrame.Plain
)
cpw.hideButtons()
# XXX: gives us outline on backside of y-axis
@ -515,7 +524,22 @@ class LinkedSplits(QWidget):
cpw.draw_ohlc(name, array, array_key=array_key)
elif style == 'line':
cpw.draw_curve(name, array, array_key=array_key)
cpw.draw_curve(
name,
array,
array_key=array_key,
color='default_light',
)
elif style == 'step':
cpw.draw_curve(
name,
array,
array_key=array_key,
step_mode=True,
color='davies',
fill_color='davies',
)
else:
raise ValueError(f"Chart style {style} is currently unsupported")
@ -523,14 +547,7 @@ class LinkedSplits(QWidget):
if not _is_main:
# track by name
self.subplots[name] = cpw
# if sidepane:
# # TODO: use a "panes" collection to manage this?
# qframe.setMaximumWidth(self.chart.sidepane.width())
# qframe.setMinimumWidth(self.chart.sidepane.width())
self.splitter.addWidget(qframe)
# scale split regions
self.set_split_sizes()
@ -586,6 +603,9 @@ class ChartPlotWidget(pg.PlotWidget):
view_color: str = 'papas_special',
pen_color: str = 'bracket',
# TODO: load from config
use_open_gl: bool = False,
static_yrange: Optional[tuple[float, float]] = None,
**kwargs,
@ -600,9 +620,9 @@ class ChartPlotWidget(pg.PlotWidget):
# parent=None,
# plotItem=None,
# antialias=True,
useOpenGL=True,
**kwargs
)
self.useOpenGL(use_open_gl)
self.name = name
self.data_key = data_key
self.linked = linkedsplits
@ -619,7 +639,8 @@ class ChartPlotWidget(pg.PlotWidget):
'ohlc': array,
}
self._graphics = {} # registry of underlying graphics
self._overlays = set() # registry of overlay curve names
# registry of overlay curve names
self._overlays: dict[str, ShmArray] = {}
self._feeds: dict[Symbol, Feed] = {}
@ -732,6 +753,7 @@ class ChartPlotWidget(pg.PlotWidget):
self._vb.setXRange(
min=l + 1,
max=r + 1,
# TODO: holy shit, wtf dude... why tf would this not be 0 by
# default... speechless.
padding=0,
@ -772,7 +794,7 @@ class ChartPlotWidget(pg.PlotWidget):
update_func=ContentsLabel.update_from_ohlc,
)
self._add_sticky(name)
self._add_sticky(name, bg_color='davies')
return graphics
@ -784,7 +806,7 @@ class ChartPlotWidget(pg.PlotWidget):
array_key: Optional[str] = None,
overlay: bool = False,
color: str = 'default_light',
color: Optional[str] = None,
add_label: bool = True,
**pdi_kwargs,
@ -794,15 +816,18 @@ class ChartPlotWidget(pg.PlotWidget):
the input array ``data``.
"""
_pdi_defaults = {
'pen': pg.mkPen(hcolor(color)),
}
pdi_kwargs.update(_pdi_defaults)
color = color or self.pen_color or 'default_light'
pdi_kwargs.update({
'color': color
})
data_key = array_key or name
# pg internals for reference.
# curve = pg.PlotDataItem(
# curve = pg.PlotCurveItem(
# yah, we wrote our own B)
curve = FastAppendCurve(
y=data[data_key],
x=data['index'],
@ -840,14 +865,14 @@ class ChartPlotWidget(pg.PlotWidget):
if overlay:
anchor_at = ('bottom', 'left')
self._overlays.add(name)
self._overlays[name] = None
else:
anchor_at = ('top', 'left')
# TODO: something instead of stickies for overlays
# (we need something that avoids clutter on x-axis).
self._add_sticky(name, bg_color='default_light')
self._add_sticky(name, bg_color=color)
if self.linked.cursor:
self.linked.cursor.add_curve_cursor(self, curve)
@ -861,6 +886,7 @@ class ChartPlotWidget(pg.PlotWidget):
return curve
# TODO: make this a ctx mngr
def _add_sticky(
self,
@ -890,67 +916,78 @@ class ChartPlotWidget(pg.PlotWidget):
def update_ohlc_from_array(
self,
name: str,
graphics_name: str,
array: np.ndarray,
**kwargs,
) -> pg.GraphicsObject:
"""Update the named internal graphics from ``array``.
"""
) -> pg.GraphicsObject:
'''Update the named internal graphics from ``array``.
'''
self._arrays['ohlc'] = array
graphics = self._graphics[name]
graphics = self._graphics[graphics_name]
graphics.update_from_array(array, **kwargs)
return graphics
def update_curve_from_array(
self,
name: str,
graphics_name: str,
array: np.ndarray,
array_key: Optional[str] = None,
**kwargs,
) -> pg.GraphicsObject:
"""Update the named internal graphics from ``array``.
'''Update the named internal graphics from ``array``.
"""
'''
assert len(array)
data_key = array_key or graphics_name
data_key = array_key or name
if name not in self._overlays:
if graphics_name not in self._overlays:
self._arrays['ohlc'] = array
else:
self._arrays[data_key] = array
curve = self._graphics[name]
curve = self._graphics[graphics_name]
if len(array):
# TODO: we should instead implement a diff based
# "only update with new items" on the pg.PlotCurveItem
# one place to dig around this might be the `QBackingStore`
# https://doc.qt.io/qt-5/qbackingstore.html
# curve.setData(y=array[name], x=array['index'], **kwargs)
curve.update_from_array(
x=array['index'],
y=array[data_key],
**kwargs
)
# NOTE: back when we weren't implementing the curve graphics
# ourselves you'd have updates using this method:
# curve.setData(y=array[graphics_name], x=array['index'], **kwargs)
# NOTE: graphics **must** implement a diff based update
# operation where an internal ``FastUpdateCurve._xrange`` is
# used to determine if the underlying path needs to be
# pre/ap-pended.
curve.update_from_array(
x=array['index'],
y=array[data_key],
**kwargs
)
return curve
def _set_yrange(
self,
*,
yrange: Optional[tuple[float, float]] = None,
range_margin: float = 0.06,
bars_range: Optional[tuple[int, int, int, int]] = None,
# flag to prevent triggering sibling charts from the same linked
# set from recursion errors.
autoscale_linked_plots: bool = True,
) -> None:
"""Set the viewable y-range based on embedded data.
'''Set the viewable y-range based on embedded data.
This adds auto-scaling like zoom on the scroll wheel such
that data always fits nicely inside the current view of the
data set.
"""
'''
set_range = True
if self._static_yrange == 'axis':
@ -966,52 +1003,50 @@ class ChartPlotWidget(pg.PlotWidget):
# Determine max, min y values in viewable x-range from data.
# Make sure min bars/datums on screen is adhered.
l, lbar, rbar, r = self.bars_range()
l, lbar, rbar, r = bars_range or self.bars_range()
# figure out x-range in view such that user can scroll "off"
# the data set up to the point where ``_min_points_to_show``
# are left.
# view_len = r - l
if autoscale_linked_plots:
# avoid recursion by sibling plots
linked = self.linked
plots = list(linked.subplots.copy().values())
main = linked.chart
if main:
plots.append(main)
for chart in plots:
if chart and not chart._static_yrange:
chart._set_yrange(
bars_range=(l, lbar, rbar, r),
autoscale_linked_plots=False,
)
# TODO: logic to check if end of bars in view
# extra = view_len - _min_points_to_show
# begin = self._arrays['ohlc'][0]['index'] - extra
# # end = len(self._arrays['ohlc']) - 1 + extra
# end = self._arrays['ohlc'][-1]['index'] - 1 + extra
# XXX: test code for only rendering lines for the bars in view.
# This turns out to be very very poor perf when scaling out to
# many bars (think > 1k) on screen.
# name = self.name
# bars = self._graphics[self.name]
# bars.draw_lines(
# istart=max(lbar, l), iend=min(rbar, r), just_history=True)
# bars_len = rbar - lbar
# log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
# self._set_xlimits(begin, end)
# TODO: this should be some kind of numpy view api
# bars = self._arrays['ohlc'][lbar:rbar]
a = self._arrays['ohlc']
ifirst = a[0]['index']
bars = a[lbar - ifirst:rbar - ifirst + 1]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
return
if self.data_key != self.linked.symbol.key:
bars = a[self.data_key]
bars = bars[self.data_key]
ylow = np.nanmin(bars)
yhigh = np.nanmax((bars))
yhigh = np.nanmax(bars)
# print(f'{(ylow, yhigh)}')
else:
# just the std ohlc bars
ylow = np.nanmin(bars['low'])
@ -1072,7 +1107,6 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: this should go onto some sort of
# data-view strimg thinger..right?
ohlc = self._shm.array
# ohlc = chart._shm.array
# XXX: not sure why the time is so off here
# looks like we're gonna have to do some fixing..

View File

@ -18,25 +18,105 @@
Fast, smooth, sexy curves.
"""
from typing import Tuple
from typing import Optional
import numpy as np
import pyqtgraph as pg
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5 import QtGui, QtWidgets
from PyQt5.QtCore import (
QLineF,
QSizeF,
QRectF,
QPointF,
)
from .._profile import pg_profile_enabled
from ._style import hcolor
def step_path_arrays_from_1d(
x: np.ndarray,
y: np.ndarray,
include_endpoints: bool = False,
) -> (np.ndarray, np.ndarray):
'''Generate a "step mode" curve aligned with OHLC style bars
such that each segment spans each bar (aka "centered" style).
'''
y_out = y.copy()
x_out = x.copy()
x2 = np.empty(
# the data + 2 endpoints on either end for
# "termination of the path".
(len(x) + 1, 2),
# we want to align with OHLC or other sampling style
# bars likely so we need fractinal values
dtype=float,
)
x2[0] = x[0] - 0.5
x2[1] = x[0] + 0.5
x2[1:] = x[:, np.newaxis] + 0.5
# flatten to 1-d
x_out = x2.reshape(x2.size)
# we create a 1d with 2 extra indexes to
# hold the start and (current) end value for the steps
# on either end
y2 = np.empty((len(y), 2), dtype=y.dtype)
y2[:] = y[:, np.newaxis]
y_out = np.empty(
2*len(y) + 2,
dtype=y.dtype
)
# flatten and set 0 endpoints
y_out[1:-1] = y2.reshape(y2.size)
y_out[0] = 0
y_out[-1] = 0
if not include_endpoints:
return x_out[:-1], y_out[:-1]
else:
return x_out, y_out
# TODO: got a feeling that dropping this inheritance gets us even more speedups
class FastAppendCurve(pg.PlotCurveItem):
def __init__(self, *args, **kwargs):
def __init__(
self,
*args,
step_mode: bool = False,
color: str = 'default_lightest',
fill_color: Optional[str] = None,
**kwargs
) -> None:
# TODO: we can probably just dispense with the parent since
# we're basically only using the pen setting now...
super().__init__(*args, **kwargs)
self._last_line: QtCore.QLineF = None
self._xrange: Tuple[int, int] = self.dataBounds(ax=0)
self._xrange: tuple[int, int] = self.dataBounds(ax=0)
# all history of curve is drawn in single px thickness
self.setPen(hcolor(color))
# last segment is drawn in 2px thickness for emphasis
self.last_step_pen = pg.mkPen(hcolor(color), width=2)
self._last_line: QLineF = None
self._last_step_rect: QRectF = None
# flat-top style histogram-like discrete curve
self._step_mode: bool = step_mode
self._fill = False
self.setBrush(hcolor(fill_color or color))
# TODO: one question still remaining is if this makes trasform
# interactions slower (such as zooming) and if so maybe if/when
@ -46,8 +126,9 @@ class FastAppendCurve(pg.PlotCurveItem):
def update_from_array(
self,
x,
y,
x: np.ndarray,
y: np.ndarray,
) -> QtGui.QPainterPath:
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
@ -59,14 +140,27 @@ class FastAppendCurve(pg.PlotCurveItem):
prepend_length = istart - x[0]
append_length = x[-1] - istop
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
x_out, y_out = step_path_arrays_from_1d(x[:-1], y[:-1])
else:
# by default we only pull data up to the last (current) index
x_out, y_out = x[:-1], y[:-1]
if self.path is None or prepend_length:
self.path = pg.functions.arrayToQPath(
x[:-1],
y[:-1],
connect='all'
x_out,
y_out,
connect='all',
finiteCheck=False,
)
profiler('generate fresh path')
# if self._step_mode:
# self.path.closeSubpath()
# TODO: get this working - right now it's giving heck on vwap...
# if prepend_length:
# breakpoint()
@ -83,21 +177,47 @@ class FastAppendCurve(pg.PlotCurveItem):
# # self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(old_path)
if append_length:
# print(f"append_length: {append_length}")
new_x = x[-append_length - 2:-1]
new_y = y[-append_length - 2:-1]
# print((new_x, new_y))
elif append_length:
if self._step_mode:
new_x, new_y = step_path_arrays_from_1d(
x[-append_length - 2:-1],
y[-append_length - 2:-1],
)
new_x = new_x[1:]
new_y = new_y[1:]
else:
# print(f"append_length: {append_length}")
new_x = x[-append_length - 2:-1]
new_y = y[-append_length - 2:-1]
# print((new_x, new_y))
append_path = pg.functions.arrayToQPath(
new_x,
new_y,
connect='all'
connect='all',
# finiteCheck=False,
)
# print(f"append_path br: {append_path.boundingRect()}")
# self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(append_path)
self.path.connectPath(append_path)
path = self.path
# other merging ideas:
# https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths
if self._step_mode:
if self._fill:
# XXX: super slow set "union" op
self.path = self.path.united(append_path).simplified()
# path.addPath(append_path)
# path.closeSubpath()
else:
# path.addPath(append_path)
self.path.connectPath(append_path)
else:
# print(f"append_path br: {append_path.boundingRect()}")
# self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(append_path)
path.connectPath(append_path)
# XXX: pretty annoying but, without this there's little
# artefacts on the append updates to the curve...
@ -112,8 +232,23 @@ class FastAppendCurve(pg.PlotCurveItem):
self.xData = x
self.yData = y
self._xrange = x[0], x[-1]
self._last_line = QtCore.QLineF(x[-2], y[-2], x[-1], y[-1])
x0, x_last = self._xrange = x[0], x[-1]
y_last = y[-1]
if self._step_mode:
self._last_line = QLineF(
x_last - 0.5, 0,
x_last + 0.5, 0,
)
self._last_step_rect = QRectF(
x_last - 0.5, 0,
x_last + 0.5, y_last
)
else:
self._last_line = QLineF(
x[-2], y[-2],
x[-1], y_last
)
# trigger redraw of path
# do update before reverting to cache mode
@ -143,13 +278,13 @@ class FastAppendCurve(pg.PlotCurveItem):
w = hb_size.width() + 1
h = hb_size.height() + 1
br = QtCore.QRectF(
br = QRectF(
# top left
QtCore.QPointF(hb.topLeft()),
QPointF(hb.topLeft()),
# total size
QtCore.QSizeF(w, h)
QSizeF(w, h)
)
# print(f'bounding rect: {br}')
return br
@ -164,9 +299,26 @@ class FastAppendCurve(pg.PlotCurveItem):
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
# p.setRenderHint(p.Antialiasing, True)
p.setPen(self.opts['pen'])
if self._step_mode:
brush = self.opts['brush']
# p.drawLines(*tuple(filter(bool, self._last_step_lines)))
# p.drawRect(self._last_step_rect)
p.fillRect(self._last_step_rect, brush)
# p.drawPath(self.path)
# profiler('.drawPath()')
# else:
p.setPen(self.last_step_pen)
p.drawLine(self._last_line)
profiler('.drawLine()')
p.setPen(self.opts['pen'])
p.drawPath(self.path)
profiler('.drawPath()')
if self._fill:
print('FILLED')
p.fillPath(self.path, brush)

File diff suppressed because it is too large Load Diff

View File

@ -48,7 +48,7 @@ from ._style import hcolor, _font, _font_small, DpiAwareFont
from ._label import FormatLabel
class FontAndChartAwareLineEdit(QLineEdit):
class Edit(QLineEdit):
def __init__(
@ -369,13 +369,14 @@ class FieldsForm(QWidget):
key: str,
label_name: str,
value: str,
readonly: bool = False,
) -> FontAndChartAwareLineEdit:
) -> Edit:
# TODO: maybe a distint layout per "field" item?
label = self.add_field_label(label_name)
edit = FontAndChartAwareLineEdit(
edit = Edit(
parent=self,
# width_in_chars=6,
)
@ -386,6 +387,7 @@ class FieldsForm(QWidget):
}}
"""
)
edit.setReadOnly(readonly)
edit.setText(str(value))
self.form.addRow(label, edit)
@ -478,13 +480,15 @@ def mk_form(
for key, conf in fields_schema.items():
wtype = conf['type']
label = str(conf.get('label', key))
kwargs = conf.get('kwargs', {})
# plain (line) edit field
if wtype == 'edit':
w = form.add_edit_field(
key,
label,
conf['default_value']
conf['default_value'],
**kwargs,
)
# drop-down selection
@ -493,7 +497,8 @@ def mk_form(
w = form.add_select_field(
key,
label,
values
values,
**kwargs,
)
w._key = key
@ -648,11 +653,21 @@ def mk_fill_status_bar(
font_size=bar_label_font_size,
font_color='gunmetal',
)
# size according to dpi scaled fonted contents to avoid
# resizes on magnitude changes (eg. 9 -> 10 %)
min_w = _font.boundingRect('1000.0M% pnl').width()
left_label.setMinimumWidth(min_w)
left_label.resize(
min_w,
left_label.size().height(),
)
bar_labels_lhs.addSpacing(5/8 * bar_h)
bar_labels_lhs.addWidget(
left_label,
alignment=Qt.AlignLeft | Qt.AlignTop,
# XXX: doesn't seem to actually push up against
# the status bar?
alignment=Qt.AlignRight | Qt.AlignTop,
)
# this hbox is added as a layout by the paner maker/caller

View File

@ -341,7 +341,14 @@ class ChartView(ViewBox):
**kwargs,
):
super().__init__(parent=parent, **kwargs)
super().__init__(
parent=parent,
# TODO: look into the default view padding
# support that might replace somem of our
# ``ChartPlotWidget._set_yrange()`
# defaultPadding=0.,
**kwargs
)
# disable vertical scrolling
self.setMouseEnabled(x=True, y=False)
@ -533,7 +540,6 @@ class ChartView(ViewBox):
# self.updateScaleBox(ev.buttonDownPos(), ev.pos())
else:
# default bevavior: click to pan view
tr = self.childGroup.transform()
tr = fn.invertQTransform(tr)
tr = tr.map(dif*mask) - tr.map(Point(0, 0))

View File

@ -146,7 +146,7 @@ def path_arrays_from_ohlc(
# specifies that the first edge is never connected to the
# prior bars last edge thus providing a small "gap"/"space"
# between bars determined by ``bar_gap``.
c[istart:istop] = (0, 1, 1, 1, 1, 1)
c[istart:istop] = (1, 1, 1, 1, 1, 0)
return x, y, c
@ -182,12 +182,14 @@ class BarItems(pg.GraphicsObject):
# scene: 'QGraphicsScene', # noqa
plotitem: 'pg.PlotItem', # noqa
pen_color: str = 'bracket',
last_bar_color: str = 'bracket',
) -> None:
super().__init__()
# XXX: for the mega-lulz increasing width here increases draw latency...
# so probably don't do it until we figure that out.
# XXX: for the mega-lulz increasing width here increases draw
# latency... so probably don't do it until we figure that out.
self.bars_pen = pg.mkPen(hcolor(pen_color), width=1)
self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2)
# NOTE: this prevents redraws on mouse interaction which is
# a huge boon for avg interaction latency.
@ -354,30 +356,6 @@ class BarItems(pg.GraphicsObject):
if flip_cache:
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
def paint(
self,
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget
) -> None:
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
# p.setCompositionMode(0)
p.setPen(self.bars_pen)
# TODO: one thing we could try here is pictures being drawn of
# a fixed count of bars such that based on the viewbox indices we
# only draw the "rounded up" number of "pictures worth" of bars
# as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars
# in view.
p.drawLines(*tuple(filter(bool, self._last_bar_lines)))
profiler('draw last bar')
p.drawPath(self.path)
profiler('draw history path')
def boundingRect(self):
# Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect
@ -421,3 +399,28 @@ class BarItems(pg.GraphicsObject):
)
)
def paint(
self,
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget
) -> None:
profiler = pg.debug.Profiler(disabled=not pg_profile_enabled())
# p.setCompositionMode(0)
# TODO: one thing we could try here is pictures being drawn of
# a fixed count of bars such that based on the viewbox indices we
# only draw the "rounded up" number of "pictures worth" of bars
# as is necesarry for what's in "view". Not sure if this will
# lead to any perf gains other then when zoomed in to less bars
# in view.
p.setPen(self.last_bar_pen)
p.drawLines(*tuple(filter(bool, self._last_bar_lines)))
profiler('draw last bar')
p.setPen(self.bars_pen)
p.drawPath(self.path)
profiler('draw history path')

View File

@ -36,7 +36,7 @@ from PyQt5.QtWidgets import (
from ._forms import (
# FontScaledDelegate,
FontAndChartAwareLineEdit,
Edit,
)
@ -97,7 +97,7 @@ class Selection(Field[DataType], Generic[DataType]):
class Edit(Field[DataType], Generic[DataType]):
'''An edit field which takes a number.
'''
widget_factory = FontAndChartAwareLineEdit
widget_factory = Edit
class AllocatorPane(BaseModel):

View File

@ -54,6 +54,7 @@ async def update_pnl_from_feed(
feed: Feed,
order_mode: OrderMode, # noqa
tracker: PositionTracker,
) -> None:
'''Real-time display the current pp's PnL in the appropriate label.
@ -76,7 +77,8 @@ async def update_pnl_from_feed(
types = ('bid', 'last', 'last', 'utrade')
else:
raise RuntimeError('No pp?!?!')
log.info(f'No position (yet) for {tracker.alloc.account}@{key}')
return
# real-time update pnl on the status pane
try:
@ -152,7 +154,7 @@ class SettingsPane:
'''Called on any order pane drop down selection change.
'''
log.info(f'selection input: {text}')
log.info(f'selection input {key}:{text}')
self.on_ui_settings_change(key, text)
def on_ui_settings_change(
@ -209,30 +211,31 @@ class SettingsPane:
# WRITE any settings to current pp's allocator
try:
value = puterize(value)
if key == 'limit':
if size_unit == 'currency':
alloc.currency_limit = value
else:
alloc.units_limit = value
elif key == 'slots':
alloc.slots = int(value)
elif key == 'size_unit':
# TODO: if there's a limit size unit change re-compute
# the current settings in the new units
if key == 'size_unit':
# implicit re-write of value if input
# is the "text name" of the units.
# yah yah, i know this is badd..
alloc.size_unit = value
else:
raise ValueError(f'Unknown setting {key}')
value = puterize(value)
if key == 'limit':
if size_unit == 'currency':
alloc.currency_limit = value
else:
alloc.units_limit = value
elif key == 'slots':
alloc.slots = int(value)
else:
raise ValueError(f'Unknown setting {key}')
log.info(f'settings change: {key}: {value}')
except ValueError:
log.error(f'Invalid value for `{key}`: {value}')
# READ out settings and update UI
# READ out settings and update the status UI / settings widgets
suffix = {'currency': ' $', 'units': ' u'}[size_unit]
limit = alloc.limit()
@ -259,6 +262,9 @@ class SettingsPane:
self.form.fields['slots'].setText(str(alloc.slots))
self.form.fields['limit'].setText(str(limit))
# update of level marker size label based on any new settings
tracker.update_from_pp()
# TODO: maybe return a diff of settings so if we can an error we
# can have general input handling code to report it through the
# UI in some way?
@ -339,6 +345,7 @@ class SettingsPane:
update_pnl_from_feed,
feed,
mode,
tracker,
)
# immediately display in status label

View File

@ -72,7 +72,7 @@ from ._style import (
_font,
hcolor,
)
from ._forms import FontAndChartAwareLineEdit, FontScaledDelegate
from ._forms import Edit, FontScaledDelegate
log = get_logger(__name__)
@ -407,7 +407,7 @@ class CompleterView(QTreeView):
self.resize()
class SearchBar(FontAndChartAwareLineEdit):
class SearchBar(Edit):
mode_name: str = 'search'

View File

@ -110,7 +110,7 @@ class DpiAwareFont:
mx_dpi = max(pdpi, ldpi)
mn_dpi = min(pdpi, ldpi)
scale = round(ldpi/pdpi)
scale = round(ldpi/pdpi, ndigits=2)
if mx_dpi <= 97: # for low dpi use larger font sizes
inches = _font_sizes['lo'][self._font_size]
@ -121,17 +121,29 @@ class DpiAwareFont:
dpi = mn_dpi
# dpi is likely somewhat scaled down so use slightly larger font size
if scale > 1 and self._font_size:
# TODO: this denominator should probably be determined from
# relative aspect ratios or something?
inches = inches * (1 / scale) * (1 + 6/16)
if scale >= 1.1 and self._font_size:
if 1.2 <= scale:
inches *= (1 / scale) * 1.0616
if scale < 1.4 or scale >= 1.5:
# TODO: this denominator should probably be determined from
# relative aspect ratios or something?
inches = inches * (1 + 6/16)
dpi = mx_dpi
log.info(f'USING MAX DPI {dpi}')
# TODO: we might want to fiddle with incrementing font size by
# +1 for the edge cases above. it seems doing it via scaling is
# always going to hit that error in range mapping from inches:
# float to px size: int.
self._font_inches = inches
font_size = math.floor(inches * dpi)
log.debug(
f"\nscreen:{screen.name()} with pDPI: {pdpi}, lDPI: {ldpi}"
log.info(
f"screen:{screen.name()}]\n"
f"pDPI: {pdpi}, lDPI: {ldpi}, scale: {scale}\n"
f"\nOur best guess font size is {font_size}\n"
)
# apply the size
@ -205,19 +217,26 @@ def hcolor(name: str) -> str:
'svags': '#0a0e14',
# fifty shades
'original': '#a9a9a9',
'gray': '#808080', # like the kick
'grayer': '#4c4c4c',
'grayest': '#3f3f3f',
'i3': '#494D4F',
'jet': '#343434',
'cadet': '#91A3B0',
'marengo': '#91A3B0',
'charcoal': '#36454F',
'gunmetal': '#91A3B0',
'battleship': '#848482',
'davies': '#555555',
# bluish
'charcoal': '#36454F',
# default bars
'bracket': '#666666', # like the logo
'original': '#a9a9a9',
# work well for filled polygons which want a 'bracket' feel
# going light to dark
'davies': '#555555',
'i3': '#494D4F',
'jet': '#343434',
# from ``qdarkstyle`` palette
'default_darkest': DarkPalette.COLOR_BACKGROUND_1,

View File

@ -47,7 +47,7 @@ from ._position import (
)
from ._label import FormatLabel
from ._window import MultiStatus
from ..clearing._messages import Order
from ..clearing._messages import Order, BrokerdPosition
from ._forms import open_form_input_handling
@ -529,7 +529,12 @@ async def open_order_mode(
book: OrderBook
trades_stream: tractor.MsgStream
position_msgs: dict
# The keys in this dict **must** be in set our set of "normalized"
# symbol names (i.e. the same names you'd get back in search
# results) in order for position msgs to correctly trigger the
# display of a position indicator on screen.
position_msgs: dict[str, list[BrokerdPosition]]
# spawn EMS actor-service
async with (
@ -563,7 +568,9 @@ async def open_order_mode(
providers=symbol.brokers
)
# use only loaded accounts according to brokerd
# XXX: ``brokerd`` delivers a set of account names that it allows
# use of but the user also can define the accounts they'd like
# to use, in order, in their `brokers.toml` file.
accounts = {}
for name in brokerd_accounts:
# ensure name is in ``brokers.toml``
@ -571,7 +578,10 @@ async def open_order_mode(
# first account listed is the one we select at startup
# (aka order based selection).
pp_account = next(iter(accounts.keys())) if accounts else 'paper'
pp_account = next(
# choose first account based on line order from `brokers.toml`.
iter(accounts.keys())
) if accounts else 'paper'
# NOTE: requires the backend exactly specifies
# the expected symbol key in its positions msg.
@ -617,8 +627,8 @@ async def open_order_mode(
# alloc?
pp_tracker.update_from_pp()
# on existing position, show pp tracking graphics
if pp_tracker.startup_pp.size != 0:
# if no position, don't show pp tracking graphics
pp_tracker.show()
pp_tracker.hide_info()
@ -802,12 +812,13 @@ async def process_trades_and_update_ui(
tracker = mode.trackers[msg['account']]
tracker.live_pp.update_from_msg(msg)
tracker.update_from_pp()
# update order pane widgets
tracker.update_from_pp()
mode.pane.update_status_ui(tracker)
# display pnl
mode.pane.display_pnl(tracker)
if tracker.live_pp.size:
# display pnl
mode.pane.display_pnl(tracker)
# short circuit to next msg to avoid
# unnecessary msg content lookups

View File

@ -25,6 +25,8 @@ import i3ipc
i3 = i3ipc.Connection()
t = i3.get_tree()
orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
@ -51,11 +53,20 @@ for name in win_names:
# move mouse to bottom left of window (where there should
# be nothing to click).
'mousemove_relative', '--sync', str(w-3), str(h-3),
'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id, '1',
'click', '--window', win_id, '--repeat', '3', '1',
# hackzorzes
'key', 'ctrl+alt+f',
])
],
timeout=1,
)
# re-activate and focus original window
subprocess.call([
'xdotool',
'windowactivate', '--sync', str(orig_win_id),
'click', '--window', str(orig_win_id), '1',
])