Compare commits
10 Commits
a681b2f0bb
...
ad565936ec
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | ad565936ec | |
Tyler Goodlet | d4b07cc95a | |
Tyler Goodlet | 1231c459aa | |
Tyler Goodlet | 88f415e5b8 | |
Tyler Goodlet | d9c574e291 | |
Tyler Goodlet | a86573b5a2 | |
Tyler Goodlet | 1d7e97a295 | |
Tyler Goodlet | bbb98597a0 | |
Tyler Goodlet | e33d6333ec | |
Tyler Goodlet | 263a5a8d07 |
|
@ -33,6 +33,11 @@ from typing import (
|
|||
)
|
||||
|
||||
import tractor
|
||||
from tractor import (
|
||||
Context,
|
||||
MsgStream,
|
||||
Channel,
|
||||
)
|
||||
from tractor.trionics import (
|
||||
maybe_open_nursery,
|
||||
)
|
||||
|
@ -53,7 +58,10 @@ if TYPE_CHECKING:
|
|||
from ._sharedmem import (
|
||||
ShmArray,
|
||||
)
|
||||
from .feed import _FeedsBus
|
||||
from .feed import (
|
||||
_FeedsBus,
|
||||
Sub,
|
||||
)
|
||||
|
||||
|
||||
# highest frequency sample step is 1 second by default, though in
|
||||
|
@ -94,7 +102,7 @@ class Sampler:
|
|||
float,
|
||||
list[
|
||||
float,
|
||||
set[tractor.MsgStream]
|
||||
set[MsgStream]
|
||||
],
|
||||
] = defaultdict(
|
||||
lambda: [
|
||||
|
@ -258,8 +266,8 @@ class Sampler:
|
|||
f'broadcasting {period_s} -> {last_ts}\n'
|
||||
# f'consumers: {subs}'
|
||||
)
|
||||
borked: set[tractor.MsgStream] = set()
|
||||
sent: set[tractor.MsgStream] = set()
|
||||
borked: set[MsgStream] = set()
|
||||
sent: set[MsgStream] = set()
|
||||
while True:
|
||||
try:
|
||||
for stream in (subs - sent):
|
||||
|
@ -314,7 +322,7 @@ class Sampler:
|
|||
|
||||
@tractor.context
|
||||
async def register_with_sampler(
|
||||
ctx: tractor.Context,
|
||||
ctx: Context,
|
||||
period_s: float,
|
||||
shms_by_period: dict[float, dict] | None = None,
|
||||
|
||||
|
@ -649,12 +657,7 @@ async def sample_and_broadcast(
|
|||
# eventually block this producer end of the feed and
|
||||
# thus other consumers still attached.
|
||||
sub_key: str = broker_symbol.lower()
|
||||
subs: list[
|
||||
tuple[
|
||||
tractor.MsgStream | trio.MemorySendChannel,
|
||||
float | None, # tick throttle in Hz
|
||||
]
|
||||
] = bus.get_subs(sub_key)
|
||||
subs: set[Sub] = bus.get_subs(sub_key)
|
||||
|
||||
# NOTE: by default the broker backend doesn't append
|
||||
# it's own "name" into the fqme schema (but maybe it
|
||||
|
@ -663,34 +666,40 @@ async def sample_and_broadcast(
|
|||
fqme: str = f'{broker_symbol}.{brokername}'
|
||||
lags: int = 0
|
||||
|
||||
# TODO: speed up this loop in an AOT compiled lang (like
|
||||
# rust or nim or zig) and/or instead of doing a fan out to
|
||||
# TCP sockets here, we add a shm-style tick queue which
|
||||
# readers can pull from instead of placing the burden of
|
||||
# broadcast on solely on this `brokerd` actor. see issues:
|
||||
# XXX TODO XXX: speed up this loop in an AOT compiled
|
||||
# lang (like rust or nim or zig)!
|
||||
# AND/OR instead of doing a fan out to TCP sockets
|
||||
# here, we add a shm-style tick queue which readers can
|
||||
# pull from instead of placing the burden of broadcast
|
||||
# on solely on this `brokerd` actor. see issues:
|
||||
# - https://github.com/pikers/piker/issues/98
|
||||
# - https://github.com/pikers/piker/issues/107
|
||||
|
||||
for (stream, tick_throttle) in subs.copy():
|
||||
# for (stream, tick_throttle) in subs.copy():
|
||||
for sub in subs.copy():
|
||||
ipc: MsgStream = sub.ipc
|
||||
throttle: float = sub.throttle_rate
|
||||
try:
|
||||
with trio.move_on_after(0.2) as cs:
|
||||
if tick_throttle:
|
||||
if throttle:
|
||||
send_chan: trio.abc.SendChannel = sub.send_chan
|
||||
|
||||
# this is a send mem chan that likely
|
||||
# pushes to the ``uniform_rate_send()`` below.
|
||||
try:
|
||||
stream.send_nowait(
|
||||
send_chan.send_nowait(
|
||||
(fqme, quote)
|
||||
)
|
||||
except trio.WouldBlock:
|
||||
overruns[sub_key] += 1
|
||||
ctx = stream._ctx
|
||||
chan = ctx.chan
|
||||
ctx: Context = ipc._ctx
|
||||
chan: Channel = ctx.chan
|
||||
|
||||
log.warning(
|
||||
f'Feed OVERRUN {sub_key}'
|
||||
'@{bus.brokername} -> \n'
|
||||
f'feed @ {chan.uid}\n'
|
||||
f'throttle = {tick_throttle} Hz'
|
||||
f'throttle = {throttle} Hz'
|
||||
)
|
||||
|
||||
if overruns[sub_key] > 6:
|
||||
|
@ -707,10 +716,10 @@ async def sample_and_broadcast(
|
|||
f'{sub_key}:'
|
||||
f'{ctx.cid}@{chan.uid}'
|
||||
)
|
||||
await stream.aclose()
|
||||
await ipc.aclose()
|
||||
raise trio.BrokenResourceError
|
||||
else:
|
||||
await stream.send(
|
||||
await ipc.send(
|
||||
{fqme: quote}
|
||||
)
|
||||
|
||||
|
@ -724,16 +733,16 @@ async def sample_and_broadcast(
|
|||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
):
|
||||
ctx = stream._ctx
|
||||
chan = ctx.chan
|
||||
ctx: Context = ipc._ctx
|
||||
chan: Channel = ctx.chan
|
||||
if ctx:
|
||||
log.warning(
|
||||
'Dropped `brokerd`-quotes-feed connection:\n'
|
||||
f'{broker_symbol}:'
|
||||
f'{ctx.cid}@{chan.uid}'
|
||||
)
|
||||
if tick_throttle:
|
||||
assert stream._closed
|
||||
if sub.throttle_rate:
|
||||
assert ipc._closed
|
||||
|
||||
# XXX: do we need to deregister here
|
||||
# if it's done in the fee bus code?
|
||||
|
@ -742,7 +751,7 @@ async def sample_and_broadcast(
|
|||
# since there seems to be some kinda race..
|
||||
bus.remove_subs(
|
||||
sub_key,
|
||||
{(stream, tick_throttle)},
|
||||
{sub},
|
||||
)
|
||||
|
||||
|
||||
|
@ -750,7 +759,7 @@ async def uniform_rate_send(
|
|||
|
||||
rate: float,
|
||||
quote_stream: trio.abc.ReceiveChannel,
|
||||
stream: tractor.MsgStream,
|
||||
stream: MsgStream,
|
||||
|
||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ module.
|
|||
from __future__ import annotations
|
||||
from collections import (
|
||||
defaultdict,
|
||||
abc,
|
||||
)
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from functools import partial
|
||||
|
@ -36,7 +37,6 @@ from types import ModuleType
|
|||
from typing import (
|
||||
Any,
|
||||
AsyncContextManager,
|
||||
Optional,
|
||||
Awaitable,
|
||||
Sequence,
|
||||
)
|
||||
|
@ -76,6 +76,31 @@ from ._sampling import (
|
|||
)
|
||||
|
||||
|
||||
class Sub(Struct, frozen=True):
|
||||
'''
|
||||
A live feed subscription entry.
|
||||
|
||||
Contains meta-data on the remote-actor type (in functionality
|
||||
terms) as well as refs to IPC streams and sampler runtime
|
||||
params.
|
||||
|
||||
'''
|
||||
ipc: tractor.MsgStream
|
||||
send_chan: trio.abc.SendChannel | None = None
|
||||
|
||||
# tick throttle rate in Hz; determines how live
|
||||
# quotes/ticks should be downsampled before relay
|
||||
# to the receiving remote consumer (process).
|
||||
throttle_rate: float | None = None
|
||||
_throttle_cs: trio.CancelScope | None = None
|
||||
|
||||
# TODO: actually stash comms info for the far end to allow
|
||||
# `.tsp`, `.fsp` and `.data._sampling` sub-systems to re-render
|
||||
# the data view as needed via msging with the `._remote_ctl`
|
||||
# ipc ctx.
|
||||
rc_ui: bool = False
|
||||
|
||||
|
||||
class _FeedsBus(Struct):
|
||||
'''
|
||||
Data feeds broadcaster and persistence management.
|
||||
|
@ -100,13 +125,7 @@ class _FeedsBus(Struct):
|
|||
|
||||
_subscribers: defaultdict[
|
||||
str,
|
||||
set[
|
||||
tuple[
|
||||
tractor.MsgStream | trio.MemorySendChannel,
|
||||
# tractor.Context,
|
||||
float | None, # tick throttle in Hz
|
||||
]
|
||||
]
|
||||
set[Sub]
|
||||
] = defaultdict(set)
|
||||
|
||||
async def start_task(
|
||||
|
@ -140,31 +159,28 @@ class _FeedsBus(Struct):
|
|||
def get_subs(
|
||||
self,
|
||||
key: str,
|
||||
) -> set[
|
||||
tuple[
|
||||
tractor.MsgStream | trio.MemorySendChannel,
|
||||
float | None, # tick throttle in Hz
|
||||
]
|
||||
]:
|
||||
|
||||
) -> set[Sub]:
|
||||
'''
|
||||
Get the ``set`` of consumer subscription entries for the given key.
|
||||
|
||||
'''
|
||||
return self._subscribers[key]
|
||||
|
||||
def subs_items(self) -> abc.ItemsView[str, set[Sub]]:
|
||||
return self._subscribers.items()
|
||||
|
||||
def add_subs(
|
||||
self,
|
||||
key: str,
|
||||
subs: set[tuple[
|
||||
tractor.MsgStream | trio.MemorySendChannel,
|
||||
float | None, # tick throttle in Hz
|
||||
]],
|
||||
) -> set[tuple]:
|
||||
subs: set[Sub],
|
||||
|
||||
) -> set[Sub]:
|
||||
'''
|
||||
Add a ``set`` of consumer subscription entries for the given key.
|
||||
|
||||
'''
|
||||
_subs: set[tuple] = self._subscribers[key]
|
||||
_subs: set[Sub] = self._subscribers.setdefault(key, set())
|
||||
_subs.update(subs)
|
||||
return _subs
|
||||
|
||||
|
@ -441,8 +457,9 @@ async def open_feed_bus(
|
|||
symbols: list[str], # normally expected to the broker-specific fqme
|
||||
|
||||
loglevel: str = 'error',
|
||||
tick_throttle: Optional[float] = None,
|
||||
tick_throttle: float | None = None,
|
||||
start_stream: bool = True,
|
||||
allow_remote_ctl_ui: bool = False,
|
||||
|
||||
) -> dict[
|
||||
str, # fqme
|
||||
|
@ -519,10 +536,10 @@ async def open_feed_bus(
|
|||
# pack for ``.started()`` sync msg
|
||||
flumes[fqme] = flume
|
||||
|
||||
# we use the broker-specific fqme (bs_fqme) for the
|
||||
# sampler subscription since the backend isn't (yet) expected to
|
||||
# append it's own name to the fqme, so we filter on keys which
|
||||
# *do not* include that name (e.g .ib) .
|
||||
# we use the broker-specific fqme (bs_fqme) for the sampler
|
||||
# subscription since the backend isn't (yet) expected to
|
||||
# append it's own name to the fqme, so we filter on keys
|
||||
# which *do not* include that name (e.g .ib) .
|
||||
bus._subscribers.setdefault(bs_fqme, set())
|
||||
|
||||
# sync feed subscribers with flume handles
|
||||
|
@ -561,49 +578,60 @@ async def open_feed_bus(
|
|||
# that the ``sample_and_broadcast()`` task (spawned inside
|
||||
# ``allocate_persistent_feed()``) will push real-time quote
|
||||
# (ticks) to this new consumer.
|
||||
|
||||
cs: trio.CancelScope | None = None
|
||||
send: trio.MemorySendChannel | None = None
|
||||
if tick_throttle:
|
||||
flume.throttle_rate = tick_throttle
|
||||
|
||||
# open a bg task which receives quotes over a mem chan
|
||||
# and only pushes them to the target actor-consumer at
|
||||
# a max ``tick_throttle`` instantaneous rate.
|
||||
# open a bg task which receives quotes over a mem
|
||||
# chan and only pushes them to the target
|
||||
# actor-consumer at a max ``tick_throttle``
|
||||
# (instantaneous) rate.
|
||||
send, recv = trio.open_memory_channel(2**10)
|
||||
|
||||
cs = await bus.start_task(
|
||||
# NOTE: the ``.send`` channel here is a swapped-in
|
||||
# trio mem chan which gets `.send()`-ed by the normal
|
||||
# sampler task but instead of being sent directly
|
||||
# over the IPC msg stream it's the throttle task
|
||||
# does the work of incrementally forwarding to the
|
||||
# IPC stream at the throttle rate.
|
||||
cs: trio.CancelScope = await bus.start_task(
|
||||
uniform_rate_send,
|
||||
tick_throttle,
|
||||
recv,
|
||||
stream,
|
||||
)
|
||||
# NOTE: so the ``send`` channel here is actually a swapped
|
||||
# in trio mem chan which gets pushed by the normal sampler
|
||||
# task but instead of being sent directly over the IPC msg
|
||||
# stream it's the throttle task does the work of
|
||||
# incrementally forwarding to the IPC stream at the throttle
|
||||
# rate.
|
||||
send._ctx = ctx # mock internal ``tractor.MsgStream`` ref
|
||||
sub = (send, tick_throttle)
|
||||
|
||||
else:
|
||||
sub = (stream, tick_throttle)
|
||||
sub = Sub(
|
||||
ipc=stream,
|
||||
send_chan=send,
|
||||
throttle_rate=tick_throttle,
|
||||
_throttle_cs=cs,
|
||||
rc_ui=allow_remote_ctl_ui,
|
||||
)
|
||||
|
||||
# TODO: add an api for this on the bus?
|
||||
# maybe use the current task-id to key the sub list that's
|
||||
# added / removed? Or maybe we can add a general
|
||||
# pause-resume by sub-key api?
|
||||
bs_fqme = fqme.removesuffix(f'.{brokername}')
|
||||
local_subs.setdefault(bs_fqme, set()).add(sub)
|
||||
bus.add_subs(bs_fqme, {sub})
|
||||
local_subs.setdefault(
|
||||
bs_fqme,
|
||||
set()
|
||||
).add(sub)
|
||||
bus.add_subs(
|
||||
bs_fqme,
|
||||
{sub}
|
||||
)
|
||||
|
||||
# sync caller with all subs registered state
|
||||
sub_registered.set()
|
||||
|
||||
uid = ctx.chan.uid
|
||||
uid: tuple[str, str] = ctx.chan.uid
|
||||
try:
|
||||
# ctrl protocol for start/stop of quote streams based on UI
|
||||
# state (eg. don't need a stream when a symbol isn't being
|
||||
# displayed).
|
||||
# ctrl protocol for start/stop of live quote streams
|
||||
# based on UI state (eg. don't need a stream when
|
||||
# a symbol isn't being displayed).
|
||||
async for msg in stream:
|
||||
|
||||
if msg == 'pause':
|
||||
|
@ -760,7 +788,7 @@ async def install_brokerd_search(
|
|||
async def maybe_open_feed(
|
||||
|
||||
fqmes: list[str],
|
||||
loglevel: Optional[str] = None,
|
||||
loglevel: str | None = None,
|
||||
|
||||
**kwargs,
|
||||
|
||||
|
@ -820,6 +848,8 @@ async def open_feed(
|
|||
start_stream: bool = True,
|
||||
tick_throttle: float | None = None, # Hz
|
||||
|
||||
allow_remote_ctl_ui: bool = False,
|
||||
|
||||
) -> Feed:
|
||||
'''
|
||||
Open a "data feed" which provides streamed real-time quotes.
|
||||
|
@ -902,6 +932,12 @@ async def open_feed(
|
|||
# of these stream open sequences sequentially per
|
||||
# backend? .. need some thot!
|
||||
allow_overruns=True,
|
||||
|
||||
# NOTE: UI actors (like charts) can allow
|
||||
# remote control of certain graphics rendering
|
||||
# capabilities via the
|
||||
# `.ui._remote_ctl.remote_annotate()` msg loop.
|
||||
allow_remote_ctl_ui=allow_remote_ctl_ui,
|
||||
)
|
||||
)
|
||||
|
||||
|
|
|
@ -20,8 +20,12 @@ Storage middle-ware CLIs.
|
|||
"""
|
||||
from __future__ import annotations
|
||||
# from datetime import datetime
|
||||
# from contextlib import (
|
||||
# AsyncExitStack,
|
||||
# )
|
||||
from pathlib import Path
|
||||
import time
|
||||
from types import ModuleType
|
||||
|
||||
import polars as pl
|
||||
import numpy as np
|
||||
|
@ -34,7 +38,6 @@ import typer
|
|||
|
||||
from piker.service import open_piker_runtime
|
||||
from piker.cli import cli
|
||||
from piker.config import get_conf_dir
|
||||
from piker.data import (
|
||||
ShmArray,
|
||||
)
|
||||
|
@ -45,6 +48,7 @@ from . import (
|
|||
from . import (
|
||||
__tsdbs__,
|
||||
open_storage_client,
|
||||
StorageClient,
|
||||
)
|
||||
|
||||
|
||||
|
@ -232,7 +236,8 @@ def anal(
|
|||
@store.command()
|
||||
def ldshm(
|
||||
fqme: str,
|
||||
write_parquet: bool = False,
|
||||
write_parquet: bool = True,
|
||||
reload_parquet_to_shm: bool = True,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -242,15 +247,32 @@ def ldshm(
|
|||
|
||||
'''
|
||||
async def main():
|
||||
from piker.ui._remote_ctl import (
|
||||
open_annot_ctl,
|
||||
AnnotCtl,
|
||||
)
|
||||
actl: AnnotCtl
|
||||
mod: ModuleType
|
||||
client: StorageClient
|
||||
async with (
|
||||
open_piker_runtime(
|
||||
'polars_boi',
|
||||
enable_modules=['piker.data._sharedmem'],
|
||||
debug_mode=True,
|
||||
),
|
||||
open_storage_client() as (
|
||||
mod,
|
||||
client,
|
||||
),
|
||||
open_annot_ctl() as actl,
|
||||
):
|
||||
df: pl.DataFrame | None = None
|
||||
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
|
||||
shm_df: pl.DataFrame | None = None
|
||||
for (
|
||||
shmfile,
|
||||
shm,
|
||||
# parquet_path,
|
||||
shm_df,
|
||||
) in tsp.iter_dfs_from_shms(fqme):
|
||||
|
||||
# compute ohlc properties for naming
|
||||
times: np.ndarray = shm.array['time']
|
||||
|
@ -275,20 +297,14 @@ def ldshm(
|
|||
period=period_s,
|
||||
)
|
||||
|
||||
# TODO: maybe only optionally enter this depending
|
||||
# on some CLI flags and/or gap detection?
|
||||
if (
|
||||
needs_correction: bool = (
|
||||
not gaps.is_empty()
|
||||
or null_segs
|
||||
):
|
||||
from piker.ui._remote_ctl import (
|
||||
open_annot_ctl,
|
||||
AnnotCtl,
|
||||
)
|
||||
annot_ctl: AnnotCtl
|
||||
async with open_annot_ctl() as annot_ctl:
|
||||
# TODO: maybe only optionally enter this depending
|
||||
# on some CLI flags and/or gap detection?
|
||||
if needs_correction:
|
||||
for i in range(gaps.height):
|
||||
|
||||
row: pl.DataFrame = gaps[i]
|
||||
|
||||
# TODO: can we eventually remove this
|
||||
|
@ -314,9 +330,8 @@ def ldshm(
|
|||
|
||||
# the gap's left-most bar's CLOSE value
|
||||
# at that time (sample) step.
|
||||
|
||||
prev_r: pl.DataFrame = df.filter(
|
||||
pl.col('index') == gaps[0]['index'] - 1
|
||||
pl.col('index') == iend - 1
|
||||
)
|
||||
istart: int = prev_r['index'][0]
|
||||
# dt_start_t: float = dt_prev.timestamp()
|
||||
|
@ -332,7 +347,6 @@ def ldshm(
|
|||
# and ensure at least as many px-cols
|
||||
# shown per rect as configured by user.
|
||||
gap_w: float = abs((iend - istart))
|
||||
# await tractor.pause()
|
||||
if gap_w < 6:
|
||||
margin: float = 6
|
||||
iend += margin
|
||||
|
@ -349,49 +363,71 @@ def ldshm(
|
|||
prev_r['close'][0],
|
||||
)
|
||||
|
||||
aid: int = await annot_ctl.add_rect(
|
||||
# async with actl.open_rect(
|
||||
# ) as aid:
|
||||
aid: int = await actl.add_rect(
|
||||
fqme=fqme,
|
||||
timeframe=period_s,
|
||||
start_pos=lc,
|
||||
end_pos=ro,
|
||||
)
|
||||
assert aid
|
||||
await tractor.pause()
|
||||
|
||||
# write to parquet file?
|
||||
if write_parquet:
|
||||
timeframe: str = f'{period_s}s'
|
||||
|
||||
datadir: Path = get_conf_dir() / 'nativedb'
|
||||
if not datadir.is_dir():
|
||||
datadir.mkdir()
|
||||
|
||||
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
|
||||
|
||||
if (
|
||||
write_parquet
|
||||
):
|
||||
# write to fs
|
||||
start = time.time()
|
||||
df.write_parquet(path)
|
||||
delay: float = round(
|
||||
path: Path = await client.write_ohlcv(
|
||||
fqme,
|
||||
ohlcv=deduped,
|
||||
timeframe=period_s,
|
||||
)
|
||||
write_delay: float = round(
|
||||
time.time() - start,
|
||||
ndigits=6,
|
||||
)
|
||||
log.info(
|
||||
f'parquet write took {delay} secs\n'
|
||||
f'file path: {path}'
|
||||
)
|
||||
|
||||
# read back from fs
|
||||
start = time.time()
|
||||
read_df: pl.DataFrame = pl.read_parquet(path)
|
||||
delay: float = round(
|
||||
read_delay: float = round(
|
||||
time.time() - start,
|
||||
ndigits=6,
|
||||
)
|
||||
print(
|
||||
f'parquet read took {delay} secs\n'
|
||||
log.info(
|
||||
f'parquet write took {write_delay} secs\n'
|
||||
f'file path: {path}'
|
||||
f'parquet read took {read_delay} secs\n'
|
||||
f'polars df: {read_df}'
|
||||
)
|
||||
|
||||
if reload_parquet_to_shm:
|
||||
new = tsp.pl2np(
|
||||
deduped,
|
||||
dtype=shm.array.dtype,
|
||||
)
|
||||
# since normally readonly
|
||||
shm._array.setflags(
|
||||
write=int(1),
|
||||
)
|
||||
shm.push(
|
||||
new,
|
||||
prepend=True,
|
||||
start=new['index'][-1],
|
||||
update_first=False, # don't update ._first
|
||||
)
|
||||
|
||||
await tractor.pause()
|
||||
assert diff
|
||||
|
||||
else:
|
||||
# allow interaction even when no ts problems.
|
||||
await tractor.pause()
|
||||
assert not diff
|
||||
|
||||
|
||||
if df is None:
|
||||
log.error(f'No matching shm buffers for {fqme} ?')
|
||||
|
||||
|
|
|
@ -95,16 +95,19 @@ def detect_period(shm: ShmArray) -> float:
|
|||
|
||||
def mk_ohlcv_shm_keyed_filepath(
|
||||
fqme: str,
|
||||
period: float, # ow known as the "timeframe"
|
||||
period: float | int, # ow known as the "timeframe"
|
||||
datadir: Path,
|
||||
|
||||
) -> str:
|
||||
) -> Path:
|
||||
|
||||
if period < 1.:
|
||||
raise ValueError('Sample period should be >= 1.!?')
|
||||
|
||||
period_s: str = f'{period}s'
|
||||
path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet'
|
||||
path: Path = (
|
||||
datadir
|
||||
/
|
||||
f'{fqme}.ohlcv{int(period)}s.parquet'
|
||||
)
|
||||
return path
|
||||
|
||||
|
||||
|
@ -227,6 +230,7 @@ class NativeStorageClient:
|
|||
self,
|
||||
fqme: str,
|
||||
period: float,
|
||||
|
||||
) -> Path:
|
||||
return mk_ohlcv_shm_keyed_filepath(
|
||||
fqme=fqme,
|
||||
|
@ -239,6 +243,7 @@ class NativeStorageClient:
|
|||
fqme: str,
|
||||
df: pl.DataFrame,
|
||||
timeframe: float,
|
||||
|
||||
) -> None:
|
||||
# cache df for later usage since we (currently) need to
|
||||
# convert to np.ndarrays to push to our `ShmArray` rt
|
||||
|
|
|
@ -120,7 +120,7 @@ from ..storage import TimeseriesNotFound
|
|||
if TYPE_CHECKING:
|
||||
from bidict import bidict
|
||||
from ..service.marketstore import StorageClient
|
||||
from .feed import _FeedsBus
|
||||
# from .feed import _FeedsBus
|
||||
|
||||
|
||||
# `ShmArray` buffer sizing configuration:
|
||||
|
@ -1352,9 +1352,7 @@ def iter_dfs_from_shms(
|
|||
readonly=True,
|
||||
)
|
||||
assert not opened
|
||||
ohlcv = shm.array
|
||||
|
||||
from ._anal import np2pl
|
||||
ohlcv: np.ndarray = shm.array
|
||||
df: pl.DataFrame = np2pl(ohlcv)
|
||||
|
||||
yield (
|
||||
|
|
|
@ -620,7 +620,11 @@ def detect_price_gaps(
|
|||
...
|
||||
|
||||
|
||||
def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||
def dedupe(
|
||||
src_df: pl.DataFrame,
|
||||
sort: bool = True,
|
||||
|
||||
) -> tuple[
|
||||
pl.DataFrame, # with dts
|
||||
pl.DataFrame, # gaps
|
||||
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
||||
|
@ -634,6 +638,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
|||
|
||||
'''
|
||||
df: pl.DataFrame = with_dts(src_df)
|
||||
|
||||
# TODO: enable passing existing `with_dts` df for speedup?
|
||||
gaps: pl.DataFrame = detect_time_gaps(df)
|
||||
|
||||
# if no gaps detected just return carbon copies
|
||||
|
@ -651,8 +657,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
|||
subset=['dt'],
|
||||
maintain_order=True,
|
||||
)
|
||||
if sort:
|
||||
deduped = deduped.sort(by='time')
|
||||
|
||||
deduped_gaps = detect_time_gaps(deduped)
|
||||
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
|
||||
|
||||
diff: int = (
|
||||
df.height
|
||||
|
@ -660,7 +668,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
|||
deduped.height
|
||||
)
|
||||
log.warning(
|
||||
f'Gaps found:\n{gaps}\n'
|
||||
f'TIME GAPs FOUND:\n'
|
||||
# f'{gaps}\n'
|
||||
f'deduped Gaps found:\n{deduped_gaps}'
|
||||
)
|
||||
return (
|
||||
|
|
|
@ -471,6 +471,9 @@ async def graphics_update_loop(
|
|||
await tractor.pause()
|
||||
|
||||
try:
|
||||
# XXX TODO: we need to do _dss UPDATE here so that when
|
||||
# a feed-view is switched you can still remote annotate the
|
||||
# prior view..
|
||||
from . import _remote_ctl
|
||||
_remote_ctl._dss = dss
|
||||
|
||||
|
@ -526,7 +529,7 @@ async def graphics_update_loop(
|
|||
finally:
|
||||
# XXX: cancel any remote annotation control ctxs
|
||||
_remote_ctl._dss = None
|
||||
for ctx in _remote_ctl._ctxs:
|
||||
for cid, (ctx, aids) in _remote_ctl._ctxs.items():
|
||||
await ctx.cancel()
|
||||
|
||||
|
||||
|
|
|
@ -314,7 +314,6 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
color.setAlpha(66)
|
||||
self.setBrush(fn.mkBrush(color))
|
||||
self.setZValue(1e9)
|
||||
self.hide()
|
||||
|
||||
label = self._label = QLabel()
|
||||
label.setTextFormat(0) # markdown
|
||||
|
@ -343,6 +342,7 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
]
|
||||
|
||||
self.add_to_view(viewbox)
|
||||
self.hide()
|
||||
|
||||
def add_to_view(
|
||||
self,
|
||||
|
@ -579,10 +579,34 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
)
|
||||
label.show()
|
||||
|
||||
def clear(self):
|
||||
def hide(self):
|
||||
'''
|
||||
Clear the selection box from view.
|
||||
Clear the selection box from its graphics scene but
|
||||
don't delete it permanently.
|
||||
|
||||
'''
|
||||
super().hide()
|
||||
self._label.hide()
|
||||
self.hide()
|
||||
|
||||
# TODO: ensure noone else using dis.
|
||||
clear = hide
|
||||
|
||||
def delete(self) -> None:
|
||||
'''
|
||||
De-allocate this rect from its rendering graphics scene.
|
||||
|
||||
Like a permanent hide.
|
||||
|
||||
'''
|
||||
scen: QGraphicsScene = self.scene()
|
||||
if scen is None:
|
||||
return
|
||||
|
||||
scen.removeItem(self)
|
||||
if (
|
||||
self._label
|
||||
and
|
||||
self._label_proxy
|
||||
|
||||
):
|
||||
scen.removeItem(self._label_proxy)
|
||||
|
|
|
@ -28,7 +28,17 @@ from typing import (
|
|||
|
||||
import pyqtgraph as pg
|
||||
from pyqtgraph import Point, functions as fn
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
||||
from PyQt5 import (
|
||||
QtCore,
|
||||
QtGui,
|
||||
)
|
||||
from PyQt5.QtWidgets import (
|
||||
QGraphicsPathItem,
|
||||
QStyleOptionGraphicsItem,
|
||||
QGraphicsItem,
|
||||
QGraphicsScene,
|
||||
QWidget,
|
||||
)
|
||||
from PyQt5.QtCore import QPointF
|
||||
|
||||
from ._annotate import LevelMarker
|
||||
|
@ -130,7 +140,7 @@ class LevelLine(pg.InfiniteLine):
|
|||
self._right_end_sc: float = 0
|
||||
|
||||
# use px caching
|
||||
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
|
||||
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
|
||||
|
||||
def txt_offsets(self) -> tuple[int, int]:
|
||||
return 0, 0
|
||||
|
@ -308,7 +318,7 @@ class LevelLine(pg.InfiniteLine):
|
|||
Remove this line from containing chart/view/scene.
|
||||
|
||||
'''
|
||||
scene = self.scene()
|
||||
scene: QGraphicsScene = self.scene()
|
||||
if scene:
|
||||
for label in self._labels:
|
||||
label.delete()
|
||||
|
@ -339,8 +349,8 @@ class LevelLine(pg.InfiniteLine):
|
|||
self,
|
||||
|
||||
p: QtGui.QPainter,
|
||||
opt: QtWidgets.QStyleOptionGraphicsItem,
|
||||
w: QtWidgets.QWidget
|
||||
opt: QStyleOptionGraphicsItem,
|
||||
w: QWidget
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
@ -417,9 +427,9 @@ class LevelLine(pg.InfiniteLine):
|
|||
|
||||
def add_marker(
|
||||
self,
|
||||
path: QtWidgets.QGraphicsPathItem,
|
||||
path: QGraphicsPathItem,
|
||||
|
||||
) -> QtWidgets.QGraphicsPathItem:
|
||||
) -> QGraphicsPathItem:
|
||||
|
||||
self._marker = path
|
||||
self._marker.setPen(self.currentPen)
|
||||
|
|
|
@ -20,10 +20,14 @@ to a chart from some other actor.
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
AsyncExitStack,
|
||||
)
|
||||
from functools import partial
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
# Any,
|
||||
AsyncContextManager,
|
||||
)
|
||||
|
||||
|
@ -41,6 +45,7 @@ from PyQt5.QtWidgets import (
|
|||
from piker.log import get_logger
|
||||
from piker.types import Struct
|
||||
from piker.service import find_service
|
||||
from piker.brokers import SymbolNotFound
|
||||
from ._display import DisplayState
|
||||
from ._interaction import ChartView
|
||||
from ._editors import SelectRect
|
||||
|
@ -59,27 +64,35 @@ _dss: dict[str, DisplayState] | None = None
|
|||
# be cancelled on shutdown/error.
|
||||
# TODO: make `tractor.Context` hashable via is `.cid: str`?
|
||||
# _ctxs: set[Context] = set()
|
||||
_ctxs: list[Context] = []
|
||||
# TODO: use type statements from 3.12+
|
||||
IpcCtxTable = dict[
|
||||
str, # each `Context.cid`
|
||||
tuple[
|
||||
Context, # handle for ctx-cancellation
|
||||
set[int] # set of annotation (instance) ids
|
||||
]
|
||||
]
|
||||
|
||||
# global map of all uniquely created annotation-graphics
|
||||
# so that they can be mutated (eventually) by a client.
|
||||
_annots: dict[int, QGraphicsItem] = {}
|
||||
_ctxs: IpcCtxTable = {}
|
||||
|
||||
# XXX: global map of all uniquely created annotation-graphics so
|
||||
# that they can be mutated (eventually) by a client.
|
||||
# NOTE: this map is only populated on the `chart` actor side (aka
|
||||
# the "annotations server" which actually renders to a Qt canvas).
|
||||
# type AnnotsTable = dict[int, QGraphicsItem]
|
||||
AnnotsTable = dict[int, QGraphicsItem]
|
||||
|
||||
_annots: AnnotsTable = {}
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def remote_annotate(
|
||||
ctx: Context,
|
||||
async def serve_rc_annots(
|
||||
ipc_key: str,
|
||||
annot_req_stream: MsgStream,
|
||||
dss: dict[str, DisplayState],
|
||||
ctxs: IpcCtxTable,
|
||||
annots: AnnotsTable,
|
||||
|
||||
) -> None:
|
||||
|
||||
global _dss, _ctxs
|
||||
assert _dss
|
||||
|
||||
_ctxs.append(ctx)
|
||||
|
||||
# send back full fqme symbology to caller
|
||||
await ctx.started(list(_dss))
|
||||
|
||||
async with ctx.open_stream() as annot_req_stream:
|
||||
async for msg in annot_req_stream:
|
||||
match msg:
|
||||
case {
|
||||
|
@ -123,7 +136,39 @@ async def remote_annotate(
|
|||
# delegate generically to the requested method
|
||||
getattr(rect, meth)(**kwargs)
|
||||
rect.show()
|
||||
await annot_req_stream.send(id(rect))
|
||||
aid: int = id(rect)
|
||||
annots[aid] = rect
|
||||
aids: set[int] = ctxs[ipc_key][1]
|
||||
aids.add(aid)
|
||||
await annot_req_stream.send(aid)
|
||||
|
||||
case {
|
||||
'rm_annot': int(aid),
|
||||
}:
|
||||
# NOTE: this is normally entered on
|
||||
# a client's annotation de-alloc normally
|
||||
# prior to detach or modify.
|
||||
annot: QGraphicsItem = annots[aid]
|
||||
annot.delete()
|
||||
|
||||
# respond to client indicating annot
|
||||
# was indeed deleted.
|
||||
await annot_req_stream.send(aid)
|
||||
|
||||
case {
|
||||
'fqme': fqme,
|
||||
'render': int(aid),
|
||||
'viz_name': str(viz_name),
|
||||
'timeframe': timeframe,
|
||||
}:
|
||||
# | {
|
||||
# 'backfilling': (str(viz_name), timeframe),
|
||||
# }:
|
||||
ds: DisplayState = _dss[viz_name]
|
||||
chart: ChartPlotWidget = {
|
||||
60: ds.hist_chart,
|
||||
1: ds.chart,
|
||||
}[timeframe]
|
||||
|
||||
case _:
|
||||
log.error(
|
||||
|
@ -132,6 +177,39 @@ async def remote_annotate(
|
|||
)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def remote_annotate(
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
|
||||
global _dss, _ctxs
|
||||
assert _dss
|
||||
|
||||
_ctxs[ctx.cid] = (ctx, set())
|
||||
|
||||
# send back full fqme symbology to caller
|
||||
await ctx.started(list(_dss))
|
||||
|
||||
# open annot request handler stream
|
||||
async with ctx.open_stream() as annot_req_stream:
|
||||
try:
|
||||
await serve_rc_annots(
|
||||
ipc_key=ctx.cid,
|
||||
annot_req_stream=annot_req_stream,
|
||||
dss=_dss,
|
||||
ctxs=_ctxs,
|
||||
annots=_annots,
|
||||
)
|
||||
finally:
|
||||
# ensure all annots for this connection are deleted
|
||||
# on any final teardown
|
||||
(_ctx, aids) = _ctxs[ctx.cid]
|
||||
assert _ctx is ctx
|
||||
for aid in aids:
|
||||
annot: QGraphicsItem = _annots[aid]
|
||||
annot.delete()
|
||||
|
||||
|
||||
class AnnotCtl(Struct):
|
||||
'''
|
||||
A control for remote "data annotations".
|
||||
|
@ -142,7 +220,12 @@ class AnnotCtl(Struct):
|
|||
|
||||
'''
|
||||
ctx2fqmes: dict[str, str]
|
||||
fqme2stream: dict[str, MsgStream]
|
||||
fqme2ipc: dict[str, MsgStream]
|
||||
_annot_stack: AsyncExitStack
|
||||
|
||||
# runtime-populated mapping of all annotation
|
||||
# ids to their equivalent IPC msg-streams.
|
||||
_ipcs: dict[int, MsgStream] = {}
|
||||
|
||||
async def add_rect(
|
||||
self,
|
||||
|
@ -155,13 +238,21 @@ class AnnotCtl(Struct):
|
|||
domain: str = 'view', # or 'scene'
|
||||
color: str = 'dad_blue',
|
||||
|
||||
from_acm: bool = False,
|
||||
|
||||
) -> int:
|
||||
'''
|
||||
Add a `SelectRect` annotation to the target view, return
|
||||
the instances `id(obj)` from the remote UI actor.
|
||||
|
||||
'''
|
||||
ipc: MsgStream = self.fqme2stream[fqme]
|
||||
ipc: MsgStream = self.fqme2ipc.get(fqme)
|
||||
if ipc is None:
|
||||
raise SymbolNotFound(
|
||||
'No chart (actor) seems to have mkt feed loaded?\n'
|
||||
f'{fqme}'
|
||||
)
|
||||
|
||||
await ipc.send({
|
||||
'fqme': fqme,
|
||||
'annot': 'SelectRect',
|
||||
|
@ -175,32 +266,61 @@ class AnnotCtl(Struct):
|
|||
'update_label': False,
|
||||
},
|
||||
})
|
||||
return (await ipc.receive())
|
||||
|
||||
async def modify(
|
||||
self,
|
||||
aid: int, # annotation id
|
||||
meth: str, # far end graphics object method to invoke
|
||||
params: dict[str, Any], # far end `meth(**kwargs)`
|
||||
) -> bool:
|
||||
'''
|
||||
Modify an existing (remote) annotation's graphics
|
||||
paramters, thus changing it's appearance / state in real
|
||||
time.
|
||||
|
||||
'''
|
||||
raise NotImplementedError
|
||||
aid: int = await ipc.receive()
|
||||
self._ipcs[aid] = ipc
|
||||
if not from_acm:
|
||||
self._annot_stack.push_async_callback(
|
||||
partial(
|
||||
self.remove,
|
||||
aid,
|
||||
)
|
||||
)
|
||||
return aid
|
||||
|
||||
async def remove(
|
||||
self,
|
||||
uid: int,
|
||||
aid: int,
|
||||
|
||||
) -> bool:
|
||||
'''
|
||||
Remove an existing annotation by instance id.
|
||||
|
||||
'''
|
||||
raise NotImplementedError
|
||||
ipc: MsgStream = self._ipcs[aid]
|
||||
await ipc.send({
|
||||
'rm_annot': aid,
|
||||
})
|
||||
removed: bool = await ipc.receive()
|
||||
return removed
|
||||
|
||||
@acm
|
||||
async def open_rect(
|
||||
self,
|
||||
**kwargs,
|
||||
) -> int:
|
||||
try:
|
||||
aid: int = await self.add_rect(
|
||||
from_acm=True,
|
||||
**kwargs,
|
||||
)
|
||||
yield aid
|
||||
finally:
|
||||
await self.remove(aid)
|
||||
|
||||
# TODO: do we even need this?
|
||||
# async def modify(
|
||||
# self,
|
||||
# aid: int, # annotation id
|
||||
# meth: str, # far end graphics object method to invoke
|
||||
# params: dict[str, Any], # far end `meth(**kwargs)`
|
||||
# ) -> bool:
|
||||
# '''
|
||||
# Modify an existing (remote) annotation's graphics
|
||||
# paramters, thus changing it's appearance / state in real
|
||||
# time.
|
||||
|
||||
# '''
|
||||
# raise NotImplementedError
|
||||
|
||||
|
||||
@acm
|
||||
|
@ -235,20 +355,18 @@ async def open_annot_ctl(
|
|||
)
|
||||
|
||||
ctx2fqmes: dict[str, set[str]] = {}
|
||||
fqme2stream: dict[str, MsgStream] = {}
|
||||
client = AnnotCtl(
|
||||
ctx2fqmes=ctx2fqmes,
|
||||
fqme2stream=fqme2stream,
|
||||
)
|
||||
|
||||
fqme2ipc: dict[str, MsgStream] = {}
|
||||
stream_ctxs: list[AsyncContextManager] = []
|
||||
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
|
||||
|
||||
async with (
|
||||
trionics.gather_contexts(ctx_mngrs) as ctxs,
|
||||
):
|
||||
for (ctx, fqmes) in ctxs:
|
||||
stream_ctxs.append(ctx.open_stream())
|
||||
|
||||
# fill lookup table of mkt addrs to IPC ctxs
|
||||
for fqme in fqmes:
|
||||
if other := fqme2stream.get(fqme):
|
||||
if other := fqme2ipc.get(fqme):
|
||||
raise ValueError(
|
||||
f'More then one chart displays {fqme}!?\n'
|
||||
'Other UI actor info:\n'
|
||||
|
@ -266,8 +384,19 @@ async def open_annot_ctl(
|
|||
for stream in streams:
|
||||
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
|
||||
for fqme in fqmes:
|
||||
fqme2stream[fqme] = stream
|
||||
fqme2ipc[fqme] = stream
|
||||
|
||||
# NOTE: on graceful teardown we always attempt to
|
||||
# remove all annots that were created by the
|
||||
# entering client.
|
||||
# TODO: should we maybe instead/also do this on the
|
||||
# server-actor side so that when a client
|
||||
# disconnects we always delete all annotations by
|
||||
# default instaead of expecting the client to?
|
||||
async with AsyncExitStack() as annots_stack:
|
||||
client = AnnotCtl(
|
||||
ctx2fqmes=ctx2fqmes,
|
||||
fqme2ipc=fqme2ipc,
|
||||
_annot_stack=annots_stack,
|
||||
)
|
||||
yield client
|
||||
# TODO: on graceful teardown should we try to
|
||||
# remove all annots that were created/modded?
|
||||
|
|
Loading…
Reference in New Issue