Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet ad565936ec Factor UI-rc loop into ctx-free func
In theory the `async for msg` loop can be re-purposed without having to
always call `remote_annotate()` so factor it into a new
`serve_rc_annots()` and then just call it from the former (for now) with
the wrapping `try:` block outside to delete per-client-ctx annotation
instance sets. Also, use some type aliases instead of repeatedly
defining the same complex `dict`-table defs B)
2023-12-26 20:56:04 -05:00
Tyler Goodlet d4b07cc95a `ui._lines`: more direct Qt imports for typing 2023-12-26 20:49:07 -05:00
Tyler Goodlet 1231c459aa Track data feed subscribers using a new `Sub(Struct)`
In prep for supporting reverse-ipc connect-back to UI actors from
middle-ware systems (for the purposes of triggering data-view canvas
re-renders and built-in tsp annotations), add a new struct type to
better generalize the management of remote feed subscriptions. Include
a `Sub.rc_ui: bool` for now (with nearby todo-comment) and expose an
`allow_remote_ctl_ui: bool` through the feed endpoints to help drive
/ prep for all that ^

Rework all the sampler tasks to expect the `Sub`'s new iface:

- split up the `Sub.ipc: MsgStream`  and `.send_chan` as separate fields
  since we're handling the throttle case in separate
  `sample_and_broadcast()` logic blocks anyway and avoids needing to
  monkey-patch on the `._ctx` malarky..
- explicitly provide the optional handle to the `_throttle_cs:
  CancelScope` again for the case where throttling/event-downsampling is
  requested.
- add `_FeedsBus.subs_items()` as a public iterator.
2023-12-26 20:48:06 -05:00
Tyler Goodlet 88f415e5b8 Cannot delete when the rect has no scene.. 2023-12-26 17:36:34 -05:00
Tyler Goodlet d9c574e291 Add `.sort()` support to `dedupe()` 2023-12-26 17:35:38 -05:00
Tyler Goodlet a86573b5a2 Fix .parquet filenaming..
Apparently `.storage.nativedb.mk_ohlcv_shm_keyed_filepath()` was always
kinda broken if you passed in a `period: float` with an actual non-`int`
to the format string? Fixed it to strictly cast to `int()` before
str-ifying so that you don't get weird `60.0s.parquet` in there..

Further this rejigs the `sotre ldshm` gap correction-annotation loop to,
- use `StorageClient.write_ohlcv()` instead of hackily re-implementing
  it.. now that problem from above is fixed!
- use a `needs_correction: bool` var to determine if gap markup and
  de-duplictated data should be pushed to the shm buffer,
- go back to using `AnnotCtl.add_rect()` for all detected gaps such that
  they all persist (and thus are shown together) until the client
  disconnects.
2023-12-26 17:14:26 -05:00
Tyler Goodlet 1d7e97a295 Woops, need to use `.push_async_callback()`
For non-full-`.__aexit__()` handlers need this method instead (facepalm).
Also create and assign the `AnnotCtl._annot_stack: AsyncExitStack` just
before yielding the client since it's not needed prior and ensures annot
removal happens **before** ipc teardown.
2023-12-24 15:08:44 -05:00
Tyler Goodlet bbb98597a0 Add annot removal via client methods or ctx-mngr
Since leaking annots to a remote `chart` actor probably isn't a thing we
want to do (often), add a removal/deletion handler block to the
`remote_annotate()` ctx which can be triggered using a `{rm_annot: aid}`
msg.

Augmnent the `AnnotCtl` with,
- `.remove() which sends said msg (from above) and returns a `bool`
  indicating success.
- add an `.open_rect()` acm which does the `.add_rect()` / `.remove()`
  calls underneath for use in scope oriented client usage.
- add a `._annot_stack: AsyncExitStack` which will always have any/all
  non-`.open_rect()` calls to `.add_rect()` register removal on client
  teardown, to avoid leaking annots when a client finally disconnects.
- comment out the `.modify()` meth idea for now.
- rename all `Xstream` var-tags to `Xipc` names.
2023-12-24 14:42:12 -05:00
Tyler Goodlet e33d6333ec Woops, remove the label-proxy, not the widget.. 2023-12-24 13:59:16 -05:00
Tyler Goodlet 263a5a8d07 Add `SelectRect.delete()` for permanent scene dealloc 2023-12-23 13:37:47 -05:00
10 changed files with 553 additions and 294 deletions

View File

@ -33,6 +33,11 @@ from typing import (
) )
import tractor import tractor
from tractor import (
Context,
MsgStream,
Channel,
)
from tractor.trionics import ( from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
@ -53,7 +58,10 @@ if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
ShmArray, ShmArray,
) )
from .feed import _FeedsBus from .feed import (
_FeedsBus,
Sub,
)
# highest frequency sample step is 1 second by default, though in # highest frequency sample step is 1 second by default, though in
@ -94,7 +102,7 @@ class Sampler:
float, float,
list[ list[
float, float,
set[tractor.MsgStream] set[MsgStream]
], ],
] = defaultdict( ] = defaultdict(
lambda: [ lambda: [
@ -258,8 +266,8 @@ class Sampler:
f'broadcasting {period_s} -> {last_ts}\n' f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}' # f'consumers: {subs}'
) )
borked: set[tractor.MsgStream] = set() borked: set[MsgStream] = set()
sent: set[tractor.MsgStream] = set() sent: set[MsgStream] = set()
while True: while True:
try: try:
for stream in (subs - sent): for stream in (subs - sent):
@ -314,7 +322,7 @@ class Sampler:
@tractor.context @tractor.context
async def register_with_sampler( async def register_with_sampler(
ctx: tractor.Context, ctx: Context,
period_s: float, period_s: float,
shms_by_period: dict[float, dict] | None = None, shms_by_period: dict[float, dict] | None = None,
@ -649,12 +657,7 @@ async def sample_and_broadcast(
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: list[ subs: set[Sub] = bus.get_subs(sub_key)
tuple[
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
] = bus.get_subs(sub_key)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
@ -663,34 +666,40 @@ async def sample_and_broadcast(
fqme: str = f'{broker_symbol}.{brokername}' fqme: str = f'{broker_symbol}.{brokername}'
lags: int = 0 lags: int = 0
# TODO: speed up this loop in an AOT compiled lang (like # XXX TODO XXX: speed up this loop in an AOT compiled
# rust or nim or zig) and/or instead of doing a fan out to # lang (like rust or nim or zig)!
# TCP sockets here, we add a shm-style tick queue which # AND/OR instead of doing a fan out to TCP sockets
# readers can pull from instead of placing the burden of # here, we add a shm-style tick queue which readers can
# broadcast on solely on this `brokerd` actor. see issues: # pull from instead of placing the burden of broadcast
# on solely on this `brokerd` actor. see issues:
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# - https://github.com/pikers/piker/issues/107 # - https://github.com/pikers/piker/issues/107
for (stream, tick_throttle) in subs.copy(): # for (stream, tick_throttle) in subs.copy():
for sub in subs.copy():
ipc: MsgStream = sub.ipc
throttle: float = sub.throttle_rate
try: try:
with trio.move_on_after(0.2) as cs: with trio.move_on_after(0.2) as cs:
if tick_throttle: if throttle:
send_chan: trio.abc.SendChannel = sub.send_chan
# this is a send mem chan that likely # this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
try: try:
stream.send_nowait( send_chan.send_nowait(
(fqme, quote) (fqme, quote)
) )
except trio.WouldBlock: except trio.WouldBlock:
overruns[sub_key] += 1 overruns[sub_key] += 1
ctx = stream._ctx ctx: Context = ipc._ctx
chan = ctx.chan chan: Channel = ctx.chan
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
'@{bus.brokername} -> \n' '@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.uid}\n'
f'throttle = {tick_throttle} Hz' f'throttle = {throttle} Hz'
) )
if overruns[sub_key] > 6: if overruns[sub_key] > 6:
@ -707,10 +716,10 @@ async def sample_and_broadcast(
f'{sub_key}:' f'{sub_key}:'
f'{ctx.cid}@{chan.uid}' f'{ctx.cid}@{chan.uid}'
) )
await stream.aclose() await ipc.aclose()
raise trio.BrokenResourceError raise trio.BrokenResourceError
else: else:
await stream.send( await ipc.send(
{fqme: quote} {fqme: quote}
) )
@ -724,16 +733,16 @@ async def sample_and_broadcast(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
): ):
ctx = stream._ctx ctx: Context = ipc._ctx
chan = ctx.chan chan: Channel = ctx.chan
if ctx: if ctx:
log.warning( log.warning(
'Dropped `brokerd`-quotes-feed connection:\n' 'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:' f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}' f'{ctx.cid}@{chan.uid}'
) )
if tick_throttle: if sub.throttle_rate:
assert stream._closed assert ipc._closed
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?
@ -742,7 +751,7 @@ async def sample_and_broadcast(
# since there seems to be some kinda race.. # since there seems to be some kinda race..
bus.remove_subs( bus.remove_subs(
sub_key, sub_key,
{(stream, tick_throttle)}, {sub},
) )
@ -750,7 +759,7 @@ async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream, stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus = trio.TASK_STATUS_IGNORED,

View File

@ -28,6 +28,7 @@ module.
from __future__ import annotations from __future__ import annotations
from collections import ( from collections import (
defaultdict, defaultdict,
abc,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
@ -36,7 +37,6 @@ from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
Optional,
Awaitable, Awaitable,
Sequence, Sequence,
) )
@ -76,6 +76,31 @@ from ._sampling import (
) )
class Sub(Struct, frozen=True):
'''
A live feed subscription entry.
Contains meta-data on the remote-actor type (in functionality
terms) as well as refs to IPC streams and sampler runtime
params.
'''
ipc: tractor.MsgStream
send_chan: trio.abc.SendChannel | None = None
# tick throttle rate in Hz; determines how live
# quotes/ticks should be downsampled before relay
# to the receiving remote consumer (process).
throttle_rate: float | None = None
_throttle_cs: trio.CancelScope | None = None
# TODO: actually stash comms info for the far end to allow
# `.tsp`, `.fsp` and `.data._sampling` sub-systems to re-render
# the data view as needed via msging with the `._remote_ctl`
# ipc ctx.
rc_ui: bool = False
class _FeedsBus(Struct): class _FeedsBus(Struct):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
@ -100,13 +125,7 @@ class _FeedsBus(Struct):
_subscribers: defaultdict[ _subscribers: defaultdict[
str, str,
set[ set[Sub]
tuple[
tractor.MsgStream | trio.MemorySendChannel,
# tractor.Context,
float | None, # tick throttle in Hz
]
]
] = defaultdict(set) ] = defaultdict(set)
async def start_task( async def start_task(
@ -140,31 +159,28 @@ class _FeedsBus(Struct):
def get_subs( def get_subs(
self, self,
key: str, key: str,
) -> set[
tuple[ ) -> set[Sub]:
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
]:
''' '''
Get the ``set`` of consumer subscription entries for the given key. Get the ``set`` of consumer subscription entries for the given key.
''' '''
return self._subscribers[key] return self._subscribers[key]
def subs_items(self) -> abc.ItemsView[str, set[Sub]]:
return self._subscribers.items()
def add_subs( def add_subs(
self, self,
key: str, key: str,
subs: set[tuple[ subs: set[Sub],
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz ) -> set[Sub]:
]],
) -> set[tuple]:
''' '''
Add a ``set`` of consumer subscription entries for the given key. Add a ``set`` of consumer subscription entries for the given key.
''' '''
_subs: set[tuple] = self._subscribers[key] _subs: set[Sub] = self._subscribers.setdefault(key, set())
_subs.update(subs) _subs.update(subs)
return _subs return _subs
@ -441,8 +457,9 @@ async def open_feed_bus(
symbols: list[str], # normally expected to the broker-specific fqme symbols: list[str], # normally expected to the broker-specific fqme
loglevel: str = 'error', loglevel: str = 'error',
tick_throttle: Optional[float] = None, tick_throttle: float | None = None,
start_stream: bool = True, start_stream: bool = True,
allow_remote_ctl_ui: bool = False,
) -> dict[ ) -> dict[
str, # fqme str, # fqme
@ -519,10 +536,10 @@ async def open_feed_bus(
# pack for ``.started()`` sync msg # pack for ``.started()`` sync msg
flumes[fqme] = flume flumes[fqme] = flume
# we use the broker-specific fqme (bs_fqme) for the # we use the broker-specific fqme (bs_fqme) for the sampler
# sampler subscription since the backend isn't (yet) expected to # subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys which # append it's own name to the fqme, so we filter on keys
# *do not* include that name (e.g .ib) . # which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_fqme, set()) bus._subscribers.setdefault(bs_fqme, set())
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
@ -561,49 +578,60 @@ async def open_feed_bus(
# that the ``sample_and_broadcast()`` task (spawned inside # that the ``sample_and_broadcast()`` task (spawned inside
# ``allocate_persistent_feed()``) will push real-time quote # ``allocate_persistent_feed()``) will push real-time quote
# (ticks) to this new consumer. # (ticks) to this new consumer.
cs: trio.CancelScope | None = None
send: trio.MemorySendChannel | None = None
if tick_throttle: if tick_throttle:
flume.throttle_rate = tick_throttle flume.throttle_rate = tick_throttle
# open a bg task which receives quotes over a mem chan # open a bg task which receives quotes over a mem
# and only pushes them to the target actor-consumer at # chan and only pushes them to the target
# a max ``tick_throttle`` instantaneous rate. # actor-consumer at a max ``tick_throttle``
# (instantaneous) rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
cs = await bus.start_task( # NOTE: the ``.send`` channel here is a swapped-in
# trio mem chan which gets `.send()`-ed by the normal
# sampler task but instead of being sent directly
# over the IPC msg stream it's the throttle task
# does the work of incrementally forwarding to the
# IPC stream at the throttle rate.
cs: trio.CancelScope = await bus.start_task(
uniform_rate_send, uniform_rate_send,
tick_throttle, tick_throttle,
recv, recv,
stream, stream,
) )
# NOTE: so the ``send`` channel here is actually a swapped
# in trio mem chan which gets pushed by the normal sampler
# task but instead of being sent directly over the IPC msg
# stream it's the throttle task does the work of
# incrementally forwarding to the IPC stream at the throttle
# rate.
send._ctx = ctx # mock internal ``tractor.MsgStream`` ref
sub = (send, tick_throttle)
else: sub = Sub(
sub = (stream, tick_throttle) ipc=stream,
send_chan=send,
throttle_rate=tick_throttle,
_throttle_cs=cs,
rc_ui=allow_remote_ctl_ui,
)
# TODO: add an api for this on the bus? # TODO: add an api for this on the bus?
# maybe use the current task-id to key the sub list that's # maybe use the current task-id to key the sub list that's
# added / removed? Or maybe we can add a general # added / removed? Or maybe we can add a general
# pause-resume by sub-key api? # pause-resume by sub-key api?
bs_fqme = fqme.removesuffix(f'.{brokername}') bs_fqme = fqme.removesuffix(f'.{brokername}')
local_subs.setdefault(bs_fqme, set()).add(sub) local_subs.setdefault(
bus.add_subs(bs_fqme, {sub}) bs_fqme,
set()
).add(sub)
bus.add_subs(
bs_fqme,
{sub}
)
# sync caller with all subs registered state # sync caller with all subs registered state
sub_registered.set() sub_registered.set()
uid = ctx.chan.uid uid: tuple[str, str] = ctx.chan.uid
try: try:
# ctrl protocol for start/stop of quote streams based on UI # ctrl protocol for start/stop of live quote streams
# state (eg. don't need a stream when a symbol isn't being # based on UI state (eg. don't need a stream when
# displayed). # a symbol isn't being displayed).
async for msg in stream: async for msg in stream:
if msg == 'pause': if msg == 'pause':
@ -760,7 +788,7 @@ async def install_brokerd_search(
async def maybe_open_feed( async def maybe_open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: Optional[str] = None, loglevel: str | None = None,
**kwargs, **kwargs,
@ -820,6 +848,8 @@ async def open_feed(
start_stream: bool = True, start_stream: bool = True,
tick_throttle: float | None = None, # Hz tick_throttle: float | None = None, # Hz
allow_remote_ctl_ui: bool = False,
) -> Feed: ) -> Feed:
''' '''
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
@ -902,6 +932,12 @@ async def open_feed(
# of these stream open sequences sequentially per # of these stream open sequences sequentially per
# backend? .. need some thot! # backend? .. need some thot!
allow_overruns=True, allow_overruns=True,
# NOTE: UI actors (like charts) can allow
# remote control of certain graphics rendering
# capabilities via the
# `.ui._remote_ctl.remote_annotate()` msg loop.
allow_remote_ctl_ui=allow_remote_ctl_ui,
) )
) )

View File

@ -20,8 +20,12 @@ Storage middle-ware CLIs.
""" """
from __future__ import annotations from __future__ import annotations
# from datetime import datetime # from datetime import datetime
# from contextlib import (
# AsyncExitStack,
# )
from pathlib import Path from pathlib import Path
import time import time
from types import ModuleType
import polars as pl import polars as pl
import numpy as np import numpy as np
@ -34,7 +38,6 @@ import typer
from piker.service import open_piker_runtime from piker.service import open_piker_runtime
from piker.cli import cli from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
ShmArray, ShmArray,
) )
@ -45,6 +48,7 @@ from . import (
from . import ( from . import (
__tsdbs__, __tsdbs__,
open_storage_client, open_storage_client,
StorageClient,
) )
@ -232,7 +236,8 @@ def anal(
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
write_parquet: bool = False, write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
) -> None: ) -> None:
''' '''
@ -242,15 +247,32 @@ def ldshm(
''' '''
async def main(): async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
actl: AnnotCtl
mod: ModuleType
client: StorageClient
async with ( async with (
open_piker_runtime( open_piker_runtime(
'polars_boi', 'polars_boi',
enable_modules=['piker.data._sharedmem'], enable_modules=['piker.data._sharedmem'],
debug_mode=True, debug_mode=True,
), ),
open_storage_client() as (
mod,
client,
),
open_annot_ctl() as actl,
): ):
df: pl.DataFrame | None = None shm_df: pl.DataFrame | None = None
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme): for (
shmfile,
shm,
# parquet_path,
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
@ -275,122 +297,136 @@ def ldshm(
period=period_s, period=period_s,
) )
# TODO: maybe only optionally enter this depending needs_correction: bool = (
# on some CLI flags and/or gap detection?
if (
not gaps.is_empty() not gaps.is_empty()
or null_segs or null_segs
): )
from piker.ui._remote_ctl import ( # TODO: maybe only optionally enter this depending
open_annot_ctl, # on some CLI flags and/or gap detection?
AnnotCtl, if needs_correction:
) for i in range(gaps.height):
annot_ctl: AnnotCtl row: pl.DataFrame = gaps[i]
async with open_annot_ctl() as annot_ctl:
for i in range(gaps.height):
row: pl.DataFrame = gaps[i] # TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# TODO: can we eventually remove this # the gap's right-most bar's OPEN value
# once we figure out why the epoch cols # at that time (sample) step.
# don't match? # dt_end_t: float = dt.timestamp()
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# the gap's right-most bar's OPEN value # TODO: FIX HOW/WHY these aren't matching
# at that time (sample) step. # and are instead off by 4hours (EST
# dt_end_t: float = dt.timestamp() # vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# TODO: FIX HOW/WHY these aren't matching # the gap's left-most bar's CLOSE value
# and are instead off by 4hours (EST # at that time (sample) step.
# vs. UTC?!?!) prev_r: pl.DataFrame = df.filter(
# end_t: float = row['time'] pl.col('index') == iend - 1
# assert ( )
# dt.timestamp() istart: int = prev_r['index'][0]
# == # dt_start_t: float = dt_prev.timestamp()
# end_t
# )
# the gap's left-most bar's CLOSE value # start_t: float = prev_r['time']
# at that time (sample) step. # assert (
# dt_start_t
# ==
# start_t
# )
prev_r: pl.DataFrame = df.filter( # TODO: implement px-col width measure
pl.col('index') == gaps[0]['index'] - 1 # and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
# async with actl.open_rect(
# ) as aid:
aid: int = await actl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
# write to parquet file?
if (
write_parquet
):
# write to fs
start = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period_s,
)
write_delay: float = round(
time.time() - start,
ndigits=6,
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {write_delay} secs\n'
f'file path: {path}'
f'parquet read took {read_delay} secs\n'
f'polars df: {read_df}'
)
if reload_parquet_to_shm:
new = tsp.pl2np(
deduped,
dtype=shm.array.dtype,
) )
istart: int = prev_r['index'][0] # since normally readonly
# dt_start_t: float = dt_prev.timestamp() shm._array.setflags(
write=int(1),
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
# await tractor.pause()
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
) )
lc: tuple[float, float] = ( shm.push(
# dt_start_t, new,
istart, prepend=True,
prev_r['close'][0], start=new['index'][-1],
update_first=False, # don't update ._first
) )
aid: int = await annot_ctl.add_rect( await tractor.pause()
fqme=fqme, assert diff
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
await tractor.pause()
# write to parquet file? else:
if write_parquet: # allow interaction even when no ts problems.
timeframe: str = f'{period_s}s' await tractor.pause()
assert not diff
datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():
datadir.mkdir()
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)
if df is None: if df is None:
log.error(f'No matching shm buffers for {fqme} ?') log.error(f'No matching shm buffers for {fqme} ?')

View File

@ -95,16 +95,19 @@ def detect_period(shm: ShmArray) -> float:
def mk_ohlcv_shm_keyed_filepath( def mk_ohlcv_shm_keyed_filepath(
fqme: str, fqme: str,
period: float, # ow known as the "timeframe" period: float | int, # ow known as the "timeframe"
datadir: Path, datadir: Path,
) -> str: ) -> Path:
if period < 1.: if period < 1.:
raise ValueError('Sample period should be >= 1.!?') raise ValueError('Sample period should be >= 1.!?')
period_s: str = f'{period}s' path: Path = (
path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet' datadir
/
f'{fqme}.ohlcv{int(period)}s.parquet'
)
return path return path
@ -227,6 +230,7 @@ class NativeStorageClient:
self, self,
fqme: str, fqme: str,
period: float, period: float,
) -> Path: ) -> Path:
return mk_ohlcv_shm_keyed_filepath( return mk_ohlcv_shm_keyed_filepath(
fqme=fqme, fqme=fqme,
@ -239,6 +243,7 @@ class NativeStorageClient:
fqme: str, fqme: str,
df: pl.DataFrame, df: pl.DataFrame,
timeframe: float, timeframe: float,
) -> None: ) -> None:
# cache df for later usage since we (currently) need to # cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt # convert to np.ndarrays to push to our `ShmArray` rt

View File

@ -120,7 +120,7 @@ from ..storage import TimeseriesNotFound
if TYPE_CHECKING: if TYPE_CHECKING:
from bidict import bidict from bidict import bidict
from ..service.marketstore import StorageClient from ..service.marketstore import StorageClient
from .feed import _FeedsBus # from .feed import _FeedsBus
# `ShmArray` buffer sizing configuration: # `ShmArray` buffer sizing configuration:
@ -1352,9 +1352,7 @@ def iter_dfs_from_shms(
readonly=True, readonly=True,
) )
assert not opened assert not opened
ohlcv = shm.array ohlcv: np.ndarray = shm.array
from ._anal import np2pl
df: pl.DataFrame = np2pl(ohlcv) df: pl.DataFrame = np2pl(ohlcv)
yield ( yield (

View File

@ -620,7 +620,11 @@ def detect_price_gaps(
... ...
def dedupe(src_df: pl.DataFrame) -> tuple[ def dedupe(
src_df: pl.DataFrame,
sort: bool = True,
) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
@ -634,6 +638,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
''' '''
df: pl.DataFrame = with_dts(src_df) df: pl.DataFrame = with_dts(src_df)
# TODO: enable passing existing `with_dts` df for speedup?
gaps: pl.DataFrame = detect_time_gaps(df) gaps: pl.DataFrame = detect_time_gaps(df)
# if no gaps detected just return carbon copies # if no gaps detected just return carbon copies
@ -651,8 +657,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
subset=['dt'], subset=['dt'],
maintain_order=True, maintain_order=True,
) )
if sort:
deduped = deduped.sort(by='time')
deduped_gaps = detect_time_gaps(deduped) deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
diff: int = ( diff: int = (
df.height df.height
@ -660,7 +668,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
deduped.height deduped.height
) )
log.warning( log.warning(
f'Gaps found:\n{gaps}\n' f'TIME GAPs FOUND:\n'
# f'{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}' f'deduped Gaps found:\n{deduped_gaps}'
) )
return ( return (

View File

@ -471,6 +471,9 @@ async def graphics_update_loop(
await tractor.pause() await tractor.pause()
try: try:
# XXX TODO: we need to do _dss UPDATE here so that when
# a feed-view is switched you can still remote annotate the
# prior view..
from . import _remote_ctl from . import _remote_ctl
_remote_ctl._dss = dss _remote_ctl._dss = dss
@ -526,7 +529,7 @@ async def graphics_update_loop(
finally: finally:
# XXX: cancel any remote annotation control ctxs # XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None _remote_ctl._dss = None
for ctx in _remote_ctl._ctxs: for cid, (ctx, aids) in _remote_ctl._ctxs.items():
await ctx.cancel() await ctx.cancel()

View File

@ -314,7 +314,6 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
color.setAlpha(66) color.setAlpha(66)
self.setBrush(fn.mkBrush(color)) self.setBrush(fn.mkBrush(color))
self.setZValue(1e9) self.setZValue(1e9)
self.hide()
label = self._label = QLabel() label = self._label = QLabel()
label.setTextFormat(0) # markdown label.setTextFormat(0) # markdown
@ -343,6 +342,7 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
] ]
self.add_to_view(viewbox) self.add_to_view(viewbox)
self.hide()
def add_to_view( def add_to_view(
self, self,
@ -579,10 +579,34 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
) )
label.show() label.show()
def clear(self): def hide(self):
''' '''
Clear the selection box from view. Clear the selection box from its graphics scene but
don't delete it permanently.
''' '''
super().hide()
self._label.hide() self._label.hide()
self.hide()
# TODO: ensure noone else using dis.
clear = hide
def delete(self) -> None:
'''
De-allocate this rect from its rendering graphics scene.
Like a permanent hide.
'''
scen: QGraphicsScene = self.scene()
if scen is None:
return
scen.removeItem(self)
if (
self._label
and
self._label_proxy
):
scen.removeItem(self._label_proxy)

View File

@ -28,7 +28,17 @@ from typing import (
import pyqtgraph as pg import pyqtgraph as pg
from pyqtgraph import Point, functions as fn from pyqtgraph import Point, functions as fn
from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5 import (
QtCore,
QtGui,
)
from PyQt5.QtWidgets import (
QGraphicsPathItem,
QStyleOptionGraphicsItem,
QGraphicsItem,
QGraphicsScene,
QWidget,
)
from PyQt5.QtCore import QPointF from PyQt5.QtCore import QPointF
from ._annotate import LevelMarker from ._annotate import LevelMarker
@ -130,7 +140,7 @@ class LevelLine(pg.InfiniteLine):
self._right_end_sc: float = 0 self._right_end_sc: float = 0
# use px caching # use px caching
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
def txt_offsets(self) -> tuple[int, int]: def txt_offsets(self) -> tuple[int, int]:
return 0, 0 return 0, 0
@ -308,7 +318,7 @@ class LevelLine(pg.InfiniteLine):
Remove this line from containing chart/view/scene. Remove this line from containing chart/view/scene.
''' '''
scene = self.scene() scene: QGraphicsScene = self.scene()
if scene: if scene:
for label in self._labels: for label in self._labels:
label.delete() label.delete()
@ -339,8 +349,8 @@ class LevelLine(pg.InfiniteLine):
self, self,
p: QtGui.QPainter, p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem, opt: QStyleOptionGraphicsItem,
w: QtWidgets.QWidget w: QWidget
) -> None: ) -> None:
''' '''
@ -417,9 +427,9 @@ class LevelLine(pg.InfiniteLine):
def add_marker( def add_marker(
self, self,
path: QtWidgets.QGraphicsPathItem, path: QGraphicsPathItem,
) -> QtWidgets.QGraphicsPathItem: ) -> QGraphicsPathItem:
self._marker = path self._marker = path
self._marker.setPen(self.currentPen) self._marker.setPen(self.currentPen)

View File

@ -20,10 +20,14 @@ to a chart from some other actor.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager as acm from contextlib import (
asynccontextmanager as acm,
AsyncExitStack,
)
from functools import partial
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
Any, # Any,
AsyncContextManager, AsyncContextManager,
) )
@ -41,6 +45,7 @@ from PyQt5.QtWidgets import (
from piker.log import get_logger from piker.log import get_logger
from piker.types import Struct from piker.types import Struct
from piker.service import find_service from piker.service import find_service
from piker.brokers import SymbolNotFound
from ._display import DisplayState from ._display import DisplayState
from ._interaction import ChartView from ._interaction import ChartView
from ._editors import SelectRect from ._editors import SelectRect
@ -59,11 +64,117 @@ _dss: dict[str, DisplayState] | None = None
# be cancelled on shutdown/error. # be cancelled on shutdown/error.
# TODO: make `tractor.Context` hashable via is `.cid: str`? # TODO: make `tractor.Context` hashable via is `.cid: str`?
# _ctxs: set[Context] = set() # _ctxs: set[Context] = set()
_ctxs: list[Context] = [] # TODO: use type statements from 3.12+
IpcCtxTable = dict[
str, # each `Context.cid`
tuple[
Context, # handle for ctx-cancellation
set[int] # set of annotation (instance) ids
]
]
# global map of all uniquely created annotation-graphics _ctxs: IpcCtxTable = {}
# so that they can be mutated (eventually) by a client.
_annots: dict[int, QGraphicsItem] = {} # XXX: global map of all uniquely created annotation-graphics so
# that they can be mutated (eventually) by a client.
# NOTE: this map is only populated on the `chart` actor side (aka
# the "annotations server" which actually renders to a Qt canvas).
# type AnnotsTable = dict[int, QGraphicsItem]
AnnotsTable = dict[int, QGraphicsItem]
_annots: AnnotsTable = {}
async def serve_rc_annots(
ipc_key: str,
annot_req_stream: MsgStream,
dss: dict[str, DisplayState],
ctxs: IpcCtxTable,
annots: AnnotsTable,
) -> None:
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
aid: int = id(rect)
annots[aid] = rect
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case {
'rm_annot': int(aid),
}:
# NOTE: this is normally entered on
# a client's annotation de-alloc normally
# prior to detach or modify.
annot: QGraphicsItem = annots[aid]
annot.delete()
# respond to client indicating annot
# was indeed deleted.
await annot_req_stream.send(aid)
case {
'fqme': fqme,
'render': int(aid),
'viz_name': str(viz_name),
'timeframe': timeframe,
}:
# | {
# 'backfilling': (str(viz_name), timeframe),
# }:
ds: DisplayState = _dss[viz_name]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
@tractor.context @tractor.context
@ -74,62 +185,29 @@ async def remote_annotate(
global _dss, _ctxs global _dss, _ctxs
assert _dss assert _dss
_ctxs.append(ctx) _ctxs[ctx.cid] = (ctx, set())
# send back full fqme symbology to caller # send back full fqme symbology to caller
await ctx.started(list(_dss)) await ctx.started(list(_dss))
# open annot request handler stream
async with ctx.open_stream() as annot_req_stream: async with ctx.open_stream() as annot_req_stream:
async for msg in annot_req_stream: try:
match msg: await serve_rc_annots(
case { ipc_key=ctx.cid,
'annot': 'SelectRect', annot_req_stream=annot_req_stream,
'fqme': fqme, dss=_dss,
'timeframe': timeframe, ctxs=_ctxs,
'meth': str(meth), annots=_annots,
'kwargs': dict(kwargs), )
}: finally:
# ensure all annots for this connection are deleted
ds: DisplayState = _dss[fqme] # on any final teardown
chart: ChartPlotWidget = { (_ctx, aids) = _ctxs[ctx.cid]
60: ds.hist_chart, assert _ctx is ctx
1: ds.chart, for aid in aids:
}[timeframe] annot: QGraphicsItem = _annots[aid]
cv: ChartView = chart.cv annot.delete()
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
await annot_req_stream.send(id(rect))
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
class AnnotCtl(Struct): class AnnotCtl(Struct):
@ -142,7 +220,12 @@ class AnnotCtl(Struct):
''' '''
ctx2fqmes: dict[str, str] ctx2fqmes: dict[str, str]
fqme2stream: dict[str, MsgStream] fqme2ipc: dict[str, MsgStream]
_annot_stack: AsyncExitStack
# runtime-populated mapping of all annotation
# ids to their equivalent IPC msg-streams.
_ipcs: dict[int, MsgStream] = {}
async def add_rect( async def add_rect(
self, self,
@ -155,13 +238,21 @@ class AnnotCtl(Struct):
domain: str = 'view', # or 'scene' domain: str = 'view', # or 'scene'
color: str = 'dad_blue', color: str = 'dad_blue',
from_acm: bool = False,
) -> int: ) -> int:
''' '''
Add a `SelectRect` annotation to the target view, return Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor. the instances `id(obj)` from the remote UI actor.
''' '''
ipc: MsgStream = self.fqme2stream[fqme] ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
await ipc.send({ await ipc.send({
'fqme': fqme, 'fqme': fqme,
'annot': 'SelectRect', 'annot': 'SelectRect',
@ -175,32 +266,61 @@ class AnnotCtl(Struct):
'update_label': False, 'update_label': False,
}, },
}) })
return (await ipc.receive()) aid: int = await ipc.receive()
self._ipcs[aid] = ipc
async def modify( if not from_acm:
self, self._annot_stack.push_async_callback(
aid: int, # annotation id partial(
meth: str, # far end graphics object method to invoke self.remove,
params: dict[str, Any], # far end `meth(**kwargs)` aid,
) -> bool: )
''' )
Modify an existing (remote) annotation's graphics return aid
paramters, thus changing it's appearance / state in real
time.
'''
raise NotImplementedError
async def remove( async def remove(
self, self,
uid: int, aid: int,
) -> bool: ) -> bool:
''' '''
Remove an existing annotation by instance id. Remove an existing annotation by instance id.
''' '''
raise NotImplementedError ipc: MsgStream = self._ipcs[aid]
await ipc.send({
'rm_annot': aid,
})
removed: bool = await ipc.receive()
return removed
@acm
async def open_rect(
self,
**kwargs,
) -> int:
try:
aid: int = await self.add_rect(
from_acm=True,
**kwargs,
)
yield aid
finally:
await self.remove(aid)
# TODO: do we even need this?
# async def modify(
# self,
# aid: int, # annotation id
# meth: str, # far end graphics object method to invoke
# params: dict[str, Any], # far end `meth(**kwargs)`
# ) -> bool:
# '''
# Modify an existing (remote) annotation's graphics
# paramters, thus changing it's appearance / state in real
# time.
# '''
# raise NotImplementedError
@acm @acm
@ -235,20 +355,18 @@ async def open_annot_ctl(
) )
ctx2fqmes: dict[str, set[str]] = {} ctx2fqmes: dict[str, set[str]] = {}
fqme2stream: dict[str, MsgStream] = {} fqme2ipc: dict[str, MsgStream] = {}
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2stream=fqme2stream,
)
stream_ctxs: list[AsyncContextManager] = [] stream_ctxs: list[AsyncContextManager] = []
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
async with (
trionics.gather_contexts(ctx_mngrs) as ctxs,
):
for (ctx, fqmes) in ctxs: for (ctx, fqmes) in ctxs:
stream_ctxs.append(ctx.open_stream()) stream_ctxs.append(ctx.open_stream())
# fill lookup table of mkt addrs to IPC ctxs # fill lookup table of mkt addrs to IPC ctxs
for fqme in fqmes: for fqme in fqmes:
if other := fqme2stream.get(fqme): if other := fqme2ipc.get(fqme):
raise ValueError( raise ValueError(
f'More then one chart displays {fqme}!?\n' f'More then one chart displays {fqme}!?\n'
'Other UI actor info:\n' 'Other UI actor info:\n'
@ -266,8 +384,19 @@ async def open_annot_ctl(
for stream in streams: for stream in streams:
fqmes: set[str] = ctx2fqmes[stream._ctx.cid] fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
for fqme in fqmes: for fqme in fqmes:
fqme2stream[fqme] = stream fqme2ipc[fqme] = stream
yield client # NOTE: on graceful teardown we always attempt to
# TODO: on graceful teardown should we try to # remove all annots that were created by the
# remove all annots that were created/modded? # entering client.
# TODO: should we maybe instead/also do this on the
# server-actor side so that when a client
# disconnects we always delete all annotations by
# default instaead of expecting the client to?
async with AsyncExitStack() as annots_stack:
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2ipc=fqme2ipc,
_annot_stack=annots_stack,
)
yield client