Compare commits

..

No commits in common. "ad565936ec669d5e324df51963ac6f2fcf482e62" and "a681b2f0bbc7fe03a00b0fb931527b4c901f5140" have entirely different histories.

10 changed files with 294 additions and 553 deletions

View File

@ -33,11 +33,6 @@ from typing import (
) )
import tractor import tractor
from tractor import (
Context,
MsgStream,
Channel,
)
from tractor.trionics import ( from tractor.trionics import (
maybe_open_nursery, maybe_open_nursery,
) )
@ -58,10 +53,7 @@ if TYPE_CHECKING:
from ._sharedmem import ( from ._sharedmem import (
ShmArray, ShmArray,
) )
from .feed import ( from .feed import _FeedsBus
_FeedsBus,
Sub,
)
# highest frequency sample step is 1 second by default, though in # highest frequency sample step is 1 second by default, though in
@ -102,7 +94,7 @@ class Sampler:
float, float,
list[ list[
float, float,
set[MsgStream] set[tractor.MsgStream]
], ],
] = defaultdict( ] = defaultdict(
lambda: [ lambda: [
@ -266,8 +258,8 @@ class Sampler:
f'broadcasting {period_s} -> {last_ts}\n' f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}' # f'consumers: {subs}'
) )
borked: set[MsgStream] = set() borked: set[tractor.MsgStream] = set()
sent: set[MsgStream] = set() sent: set[tractor.MsgStream] = set()
while True: while True:
try: try:
for stream in (subs - sent): for stream in (subs - sent):
@ -322,7 +314,7 @@ class Sampler:
@tractor.context @tractor.context
async def register_with_sampler( async def register_with_sampler(
ctx: Context, ctx: tractor.Context,
period_s: float, period_s: float,
shms_by_period: dict[float, dict] | None = None, shms_by_period: dict[float, dict] | None = None,
@ -657,7 +649,12 @@ async def sample_and_broadcast(
# eventually block this producer end of the feed and # eventually block this producer end of the feed and
# thus other consumers still attached. # thus other consumers still attached.
sub_key: str = broker_symbol.lower() sub_key: str = broker_symbol.lower()
subs: set[Sub] = bus.get_subs(sub_key) subs: list[
tuple[
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
] = bus.get_subs(sub_key)
# NOTE: by default the broker backend doesn't append # NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it # it's own "name" into the fqme schema (but maybe it
@ -666,40 +663,34 @@ async def sample_and_broadcast(
fqme: str = f'{broker_symbol}.{brokername}' fqme: str = f'{broker_symbol}.{brokername}'
lags: int = 0 lags: int = 0
# XXX TODO XXX: speed up this loop in an AOT compiled # TODO: speed up this loop in an AOT compiled lang (like
# lang (like rust or nim or zig)! # rust or nim or zig) and/or instead of doing a fan out to
# AND/OR instead of doing a fan out to TCP sockets # TCP sockets here, we add a shm-style tick queue which
# here, we add a shm-style tick queue which readers can # readers can pull from instead of placing the burden of
# pull from instead of placing the burden of broadcast # broadcast on solely on this `brokerd` actor. see issues:
# on solely on this `brokerd` actor. see issues:
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# - https://github.com/pikers/piker/issues/107 # - https://github.com/pikers/piker/issues/107
# for (stream, tick_throttle) in subs.copy(): for (stream, tick_throttle) in subs.copy():
for sub in subs.copy():
ipc: MsgStream = sub.ipc
throttle: float = sub.throttle_rate
try: try:
with trio.move_on_after(0.2) as cs: with trio.move_on_after(0.2) as cs:
if throttle: if tick_throttle:
send_chan: trio.abc.SendChannel = sub.send_chan
# this is a send mem chan that likely # this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below. # pushes to the ``uniform_rate_send()`` below.
try: try:
send_chan.send_nowait( stream.send_nowait(
(fqme, quote) (fqme, quote)
) )
except trio.WouldBlock: except trio.WouldBlock:
overruns[sub_key] += 1 overruns[sub_key] += 1
ctx: Context = ipc._ctx ctx = stream._ctx
chan: Channel = ctx.chan chan = ctx.chan
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
'@{bus.brokername} -> \n' '@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz' f'throttle = {tick_throttle} Hz'
) )
if overruns[sub_key] > 6: if overruns[sub_key] > 6:
@ -716,10 +707,10 @@ async def sample_and_broadcast(
f'{sub_key}:' f'{sub_key}:'
f'{ctx.cid}@{chan.uid}' f'{ctx.cid}@{chan.uid}'
) )
await ipc.aclose() await stream.aclose()
raise trio.BrokenResourceError raise trio.BrokenResourceError
else: else:
await ipc.send( await stream.send(
{fqme: quote} {fqme: quote}
) )
@ -733,16 +724,16 @@ async def sample_and_broadcast(
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
): ):
ctx: Context = ipc._ctx ctx = stream._ctx
chan: Channel = ctx.chan chan = ctx.chan
if ctx: if ctx:
log.warning( log.warning(
'Dropped `brokerd`-quotes-feed connection:\n' 'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:' f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}' f'{ctx.cid}@{chan.uid}'
) )
if sub.throttle_rate: if tick_throttle:
assert ipc._closed assert stream._closed
# XXX: do we need to deregister here # XXX: do we need to deregister here
# if it's done in the fee bus code? # if it's done in the fee bus code?
@ -751,7 +742,7 @@ async def sample_and_broadcast(
# since there seems to be some kinda race.. # since there seems to be some kinda race..
bus.remove_subs( bus.remove_subs(
sub_key, sub_key,
{sub}, {(stream, tick_throttle)},
) )
@ -759,7 +750,7 @@ async def uniform_rate_send(
rate: float, rate: float,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
stream: MsgStream, stream: tractor.MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED, task_status: TaskStatus = trio.TASK_STATUS_IGNORED,

View File

@ -28,7 +28,6 @@ module.
from __future__ import annotations from __future__ import annotations
from collections import ( from collections import (
defaultdict, defaultdict,
abc,
) )
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
@ -37,6 +36,7 @@ from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncContextManager, AsyncContextManager,
Optional,
Awaitable, Awaitable,
Sequence, Sequence,
) )
@ -76,31 +76,6 @@ from ._sampling import (
) )
class Sub(Struct, frozen=True):
'''
A live feed subscription entry.
Contains meta-data on the remote-actor type (in functionality
terms) as well as refs to IPC streams and sampler runtime
params.
'''
ipc: tractor.MsgStream
send_chan: trio.abc.SendChannel | None = None
# tick throttle rate in Hz; determines how live
# quotes/ticks should be downsampled before relay
# to the receiving remote consumer (process).
throttle_rate: float | None = None
_throttle_cs: trio.CancelScope | None = None
# TODO: actually stash comms info for the far end to allow
# `.tsp`, `.fsp` and `.data._sampling` sub-systems to re-render
# the data view as needed via msging with the `._remote_ctl`
# ipc ctx.
rc_ui: bool = False
class _FeedsBus(Struct): class _FeedsBus(Struct):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
@ -125,7 +100,13 @@ class _FeedsBus(Struct):
_subscribers: defaultdict[ _subscribers: defaultdict[
str, str,
set[Sub] set[
tuple[
tractor.MsgStream | trio.MemorySendChannel,
# tractor.Context,
float | None, # tick throttle in Hz
]
]
] = defaultdict(set) ] = defaultdict(set)
async def start_task( async def start_task(
@ -159,28 +140,31 @@ class _FeedsBus(Struct):
def get_subs( def get_subs(
self, self,
key: str, key: str,
) -> set[
) -> set[Sub]: tuple[
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
]:
''' '''
Get the ``set`` of consumer subscription entries for the given key. Get the ``set`` of consumer subscription entries for the given key.
''' '''
return self._subscribers[key] return self._subscribers[key]
def subs_items(self) -> abc.ItemsView[str, set[Sub]]:
return self._subscribers.items()
def add_subs( def add_subs(
self, self,
key: str, key: str,
subs: set[Sub], subs: set[tuple[
tractor.MsgStream | trio.MemorySendChannel,
) -> set[Sub]: float | None, # tick throttle in Hz
]],
) -> set[tuple]:
''' '''
Add a ``set`` of consumer subscription entries for the given key. Add a ``set`` of consumer subscription entries for the given key.
''' '''
_subs: set[Sub] = self._subscribers.setdefault(key, set()) _subs: set[tuple] = self._subscribers[key]
_subs.update(subs) _subs.update(subs)
return _subs return _subs
@ -457,9 +441,8 @@ async def open_feed_bus(
symbols: list[str], # normally expected to the broker-specific fqme symbols: list[str], # normally expected to the broker-specific fqme
loglevel: str = 'error', loglevel: str = 'error',
tick_throttle: float | None = None, tick_throttle: Optional[float] = None,
start_stream: bool = True, start_stream: bool = True,
allow_remote_ctl_ui: bool = False,
) -> dict[ ) -> dict[
str, # fqme str, # fqme
@ -536,10 +519,10 @@ async def open_feed_bus(
# pack for ``.started()`` sync msg # pack for ``.started()`` sync msg
flumes[fqme] = flume flumes[fqme] = flume
# we use the broker-specific fqme (bs_fqme) for the sampler # we use the broker-specific fqme (bs_fqme) for the
# subscription since the backend isn't (yet) expected to # sampler subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys # append it's own name to the fqme, so we filter on keys which
# which *do not* include that name (e.g .ib) . # *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_fqme, set()) bus._subscribers.setdefault(bs_fqme, set())
# sync feed subscribers with flume handles # sync feed subscribers with flume handles
@ -578,60 +561,49 @@ async def open_feed_bus(
# that the ``sample_and_broadcast()`` task (spawned inside # that the ``sample_and_broadcast()`` task (spawned inside
# ``allocate_persistent_feed()``) will push real-time quote # ``allocate_persistent_feed()``) will push real-time quote
# (ticks) to this new consumer. # (ticks) to this new consumer.
cs: trio.CancelScope | None = None
send: trio.MemorySendChannel | None = None
if tick_throttle: if tick_throttle:
flume.throttle_rate = tick_throttle flume.throttle_rate = tick_throttle
# open a bg task which receives quotes over a mem # open a bg task which receives quotes over a mem chan
# chan and only pushes them to the target # and only pushes them to the target actor-consumer at
# actor-consumer at a max ``tick_throttle`` # a max ``tick_throttle`` instantaneous rate.
# (instantaneous) rate.
send, recv = trio.open_memory_channel(2**10) send, recv = trio.open_memory_channel(2**10)
# NOTE: the ``.send`` channel here is a swapped-in cs = await bus.start_task(
# trio mem chan which gets `.send()`-ed by the normal
# sampler task but instead of being sent directly
# over the IPC msg stream it's the throttle task
# does the work of incrementally forwarding to the
# IPC stream at the throttle rate.
cs: trio.CancelScope = await bus.start_task(
uniform_rate_send, uniform_rate_send,
tick_throttle, tick_throttle,
recv, recv,
stream, stream,
) )
# NOTE: so the ``send`` channel here is actually a swapped
# in trio mem chan which gets pushed by the normal sampler
# task but instead of being sent directly over the IPC msg
# stream it's the throttle task does the work of
# incrementally forwarding to the IPC stream at the throttle
# rate.
send._ctx = ctx # mock internal ``tractor.MsgStream`` ref
sub = (send, tick_throttle)
sub = Sub( else:
ipc=stream, sub = (stream, tick_throttle)
send_chan=send,
throttle_rate=tick_throttle,
_throttle_cs=cs,
rc_ui=allow_remote_ctl_ui,
)
# TODO: add an api for this on the bus? # TODO: add an api for this on the bus?
# maybe use the current task-id to key the sub list that's # maybe use the current task-id to key the sub list that's
# added / removed? Or maybe we can add a general # added / removed? Or maybe we can add a general
# pause-resume by sub-key api? # pause-resume by sub-key api?
bs_fqme = fqme.removesuffix(f'.{brokername}') bs_fqme = fqme.removesuffix(f'.{brokername}')
local_subs.setdefault( local_subs.setdefault(bs_fqme, set()).add(sub)
bs_fqme, bus.add_subs(bs_fqme, {sub})
set()
).add(sub)
bus.add_subs(
bs_fqme,
{sub}
)
# sync caller with all subs registered state # sync caller with all subs registered state
sub_registered.set() sub_registered.set()
uid: tuple[str, str] = ctx.chan.uid uid = ctx.chan.uid
try: try:
# ctrl protocol for start/stop of live quote streams # ctrl protocol for start/stop of quote streams based on UI
# based on UI state (eg. don't need a stream when # state (eg. don't need a stream when a symbol isn't being
# a symbol isn't being displayed). # displayed).
async for msg in stream: async for msg in stream:
if msg == 'pause': if msg == 'pause':
@ -788,7 +760,7 @@ async def install_brokerd_search(
async def maybe_open_feed( async def maybe_open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: str | None = None, loglevel: Optional[str] = None,
**kwargs, **kwargs,
@ -848,8 +820,6 @@ async def open_feed(
start_stream: bool = True, start_stream: bool = True,
tick_throttle: float | None = None, # Hz tick_throttle: float | None = None, # Hz
allow_remote_ctl_ui: bool = False,
) -> Feed: ) -> Feed:
''' '''
Open a "data feed" which provides streamed real-time quotes. Open a "data feed" which provides streamed real-time quotes.
@ -932,12 +902,6 @@ async def open_feed(
# of these stream open sequences sequentially per # of these stream open sequences sequentially per
# backend? .. need some thot! # backend? .. need some thot!
allow_overruns=True, allow_overruns=True,
# NOTE: UI actors (like charts) can allow
# remote control of certain graphics rendering
# capabilities via the
# `.ui._remote_ctl.remote_annotate()` msg loop.
allow_remote_ctl_ui=allow_remote_ctl_ui,
) )
) )

View File

@ -20,12 +20,8 @@ Storage middle-ware CLIs.
""" """
from __future__ import annotations from __future__ import annotations
# from datetime import datetime # from datetime import datetime
# from contextlib import (
# AsyncExitStack,
# )
from pathlib import Path from pathlib import Path
import time import time
from types import ModuleType
import polars as pl import polars as pl
import numpy as np import numpy as np
@ -38,6 +34,7 @@ import typer
from piker.service import open_piker_runtime from piker.service import open_piker_runtime
from piker.cli import cli from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
ShmArray, ShmArray,
) )
@ -48,7 +45,6 @@ from . import (
from . import ( from . import (
__tsdbs__, __tsdbs__,
open_storage_client, open_storage_client,
StorageClient,
) )
@ -236,8 +232,7 @@ def anal(
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
write_parquet: bool = True, write_parquet: bool = False,
reload_parquet_to_shm: bool = True,
) -> None: ) -> None:
''' '''
@ -247,32 +242,15 @@ def ldshm(
''' '''
async def main(): async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
actl: AnnotCtl
mod: ModuleType
client: StorageClient
async with ( async with (
open_piker_runtime( open_piker_runtime(
'polars_boi', 'polars_boi',
enable_modules=['piker.data._sharedmem'], enable_modules=['piker.data._sharedmem'],
debug_mode=True, debug_mode=True,
), ),
open_storage_client() as (
mod,
client,
),
open_annot_ctl() as actl,
): ):
shm_df: pl.DataFrame | None = None df: pl.DataFrame | None = None
for ( for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
shmfile,
shm,
# parquet_path,
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
@ -297,136 +275,122 @@ def ldshm(
period=period_s, period=period_s,
) )
needs_correction: bool = (
not gaps.is_empty()
or null_segs
)
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?
if needs_correction: if (
for i in range(gaps.height): not gaps.is_empty()
row: pl.DataFrame = gaps[i] or null_segs
):
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
annot_ctl: AnnotCtl
async with open_annot_ctl() as annot_ctl:
for i in range(gaps.height):
# TODO: can we eventually remove this row: pl.DataFrame = gaps[i]
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# the gap's right-most bar's OPEN value # TODO: can we eventually remove this
# at that time (sample) step. # once we figure out why the epoch cols
# dt_end_t: float = dt.timestamp() # don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# TODO: FIX HOW/WHY these aren't matching # the gap's right-most bar's OPEN value
# and are instead off by 4hours (EST # at that time (sample) step.
# vs. UTC?!?!) # dt_end_t: float = dt.timestamp()
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's left-most bar's CLOSE value # TODO: FIX HOW/WHY these aren't matching
# at that time (sample) step. # and are instead off by 4hours (EST
prev_r: pl.DataFrame = df.filter( # vs. UTC?!?!)
pl.col('index') == iend - 1 # end_t: float = row['time']
) # assert (
istart: int = prev_r['index'][0] # dt.timestamp()
# dt_start_t: float = dt_prev.timestamp() # ==
# end_t
# )
# start_t: float = prev_r['time'] # the gap's left-most bar's CLOSE value
# assert ( # at that time (sample) step.
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure prev_r: pl.DataFrame = df.filter(
# and ensure at least as many px-cols pl.col('index') == gaps[0]['index'] - 1
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
# async with actl.open_rect(
# ) as aid:
aid: int = await actl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
# write to parquet file?
if (
write_parquet
):
# write to fs
start = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period_s,
)
write_delay: float = round(
time.time() - start,
ndigits=6,
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {write_delay} secs\n'
f'file path: {path}'
f'parquet read took {read_delay} secs\n'
f'polars df: {read_df}'
)
if reload_parquet_to_shm:
new = tsp.pl2np(
deduped,
dtype=shm.array.dtype,
) )
# since normally readonly istart: int = prev_r['index'][0]
shm._array.setflags( # dt_start_t: float = dt_prev.timestamp()
write=int(1),
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
# await tractor.pause()
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
) )
shm.push( lc: tuple[float, float] = (
new, # dt_start_t,
prepend=True, istart,
start=new['index'][-1], prev_r['close'][0],
update_first=False, # don't update ._first
) )
await tractor.pause() aid: int = await annot_ctl.add_rect(
assert diff fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
await tractor.pause()
else: # write to parquet file?
# allow interaction even when no ts problems. if write_parquet:
await tractor.pause() timeframe: str = f'{period_s}s'
assert not diff
datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():
datadir.mkdir()
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)
if df is None: if df is None:
log.error(f'No matching shm buffers for {fqme} ?') log.error(f'No matching shm buffers for {fqme} ?')

View File

@ -95,19 +95,16 @@ def detect_period(shm: ShmArray) -> float:
def mk_ohlcv_shm_keyed_filepath( def mk_ohlcv_shm_keyed_filepath(
fqme: str, fqme: str,
period: float | int, # ow known as the "timeframe" period: float, # ow known as the "timeframe"
datadir: Path, datadir: Path,
) -> Path: ) -> str:
if period < 1.: if period < 1.:
raise ValueError('Sample period should be >= 1.!?') raise ValueError('Sample period should be >= 1.!?')
path: Path = ( period_s: str = f'{period}s'
datadir path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet'
/
f'{fqme}.ohlcv{int(period)}s.parquet'
)
return path return path
@ -230,7 +227,6 @@ class NativeStorageClient:
self, self,
fqme: str, fqme: str,
period: float, period: float,
) -> Path: ) -> Path:
return mk_ohlcv_shm_keyed_filepath( return mk_ohlcv_shm_keyed_filepath(
fqme=fqme, fqme=fqme,
@ -243,7 +239,6 @@ class NativeStorageClient:
fqme: str, fqme: str,
df: pl.DataFrame, df: pl.DataFrame,
timeframe: float, timeframe: float,
) -> None: ) -> None:
# cache df for later usage since we (currently) need to # cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt # convert to np.ndarrays to push to our `ShmArray` rt

View File

@ -120,7 +120,7 @@ from ..storage import TimeseriesNotFound
if TYPE_CHECKING: if TYPE_CHECKING:
from bidict import bidict from bidict import bidict
from ..service.marketstore import StorageClient from ..service.marketstore import StorageClient
# from .feed import _FeedsBus from .feed import _FeedsBus
# `ShmArray` buffer sizing configuration: # `ShmArray` buffer sizing configuration:
@ -1352,7 +1352,9 @@ def iter_dfs_from_shms(
readonly=True, readonly=True,
) )
assert not opened assert not opened
ohlcv: np.ndarray = shm.array ohlcv = shm.array
from ._anal import np2pl
df: pl.DataFrame = np2pl(ohlcv) df: pl.DataFrame = np2pl(ohlcv)
yield ( yield (

View File

@ -620,11 +620,7 @@ def detect_price_gaps(
... ...
def dedupe( def dedupe(src_df: pl.DataFrame) -> tuple[
src_df: pl.DataFrame,
sort: bool = True,
) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
@ -638,8 +634,6 @@ def dedupe(
''' '''
df: pl.DataFrame = with_dts(src_df) df: pl.DataFrame = with_dts(src_df)
# TODO: enable passing existing `with_dts` df for speedup?
gaps: pl.DataFrame = detect_time_gaps(df) gaps: pl.DataFrame = detect_time_gaps(df)
# if no gaps detected just return carbon copies # if no gaps detected just return carbon copies
@ -657,10 +651,8 @@ def dedupe(
subset=['dt'], subset=['dt'],
maintain_order=True, maintain_order=True,
) )
if sort:
deduped = deduped.sort(by='time')
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped) deduped_gaps = detect_time_gaps(deduped)
diff: int = ( diff: int = (
df.height df.height
@ -668,8 +660,7 @@ def dedupe(
deduped.height deduped.height
) )
log.warning( log.warning(
f'TIME GAPs FOUND:\n' f'Gaps found:\n{gaps}\n'
# f'{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}' f'deduped Gaps found:\n{deduped_gaps}'
) )
return ( return (

View File

@ -471,9 +471,6 @@ async def graphics_update_loop(
await tractor.pause() await tractor.pause()
try: try:
# XXX TODO: we need to do _dss UPDATE here so that when
# a feed-view is switched you can still remote annotate the
# prior view..
from . import _remote_ctl from . import _remote_ctl
_remote_ctl._dss = dss _remote_ctl._dss = dss
@ -529,7 +526,7 @@ async def graphics_update_loop(
finally: finally:
# XXX: cancel any remote annotation control ctxs # XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None _remote_ctl._dss = None
for cid, (ctx, aids) in _remote_ctl._ctxs.items(): for ctx in _remote_ctl._ctxs:
await ctx.cancel() await ctx.cancel()

View File

@ -314,6 +314,7 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
color.setAlpha(66) color.setAlpha(66)
self.setBrush(fn.mkBrush(color)) self.setBrush(fn.mkBrush(color))
self.setZValue(1e9) self.setZValue(1e9)
self.hide()
label = self._label = QLabel() label = self._label = QLabel()
label.setTextFormat(0) # markdown label.setTextFormat(0) # markdown
@ -342,7 +343,6 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
] ]
self.add_to_view(viewbox) self.add_to_view(viewbox)
self.hide()
def add_to_view( def add_to_view(
self, self,
@ -579,34 +579,10 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
) )
label.show() label.show()
def hide(self): def clear(self):
''' '''
Clear the selection box from its graphics scene but Clear the selection box from view.
don't delete it permanently.
''' '''
super().hide()
self._label.hide() self._label.hide()
self.hide()
# TODO: ensure noone else using dis.
clear = hide
def delete(self) -> None:
'''
De-allocate this rect from its rendering graphics scene.
Like a permanent hide.
'''
scen: QGraphicsScene = self.scene()
if scen is None:
return
scen.removeItem(self)
if (
self._label
and
self._label_proxy
):
scen.removeItem(self._label_proxy)

View File

@ -28,17 +28,7 @@ from typing import (
import pyqtgraph as pg import pyqtgraph as pg
from pyqtgraph import Point, functions as fn from pyqtgraph import Point, functions as fn
from PyQt5 import ( from PyQt5 import QtCore, QtGui, QtWidgets
QtCore,
QtGui,
)
from PyQt5.QtWidgets import (
QGraphicsPathItem,
QStyleOptionGraphicsItem,
QGraphicsItem,
QGraphicsScene,
QWidget,
)
from PyQt5.QtCore import QPointF from PyQt5.QtCore import QPointF
from ._annotate import LevelMarker from ._annotate import LevelMarker
@ -140,7 +130,7 @@ class LevelLine(pg.InfiniteLine):
self._right_end_sc: float = 0 self._right_end_sc: float = 0
# use px caching # use px caching
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache) self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
def txt_offsets(self) -> tuple[int, int]: def txt_offsets(self) -> tuple[int, int]:
return 0, 0 return 0, 0
@ -318,7 +308,7 @@ class LevelLine(pg.InfiniteLine):
Remove this line from containing chart/view/scene. Remove this line from containing chart/view/scene.
''' '''
scene: QGraphicsScene = self.scene() scene = self.scene()
if scene: if scene:
for label in self._labels: for label in self._labels:
label.delete() label.delete()
@ -349,8 +339,8 @@ class LevelLine(pg.InfiniteLine):
self, self,
p: QtGui.QPainter, p: QtGui.QPainter,
opt: QStyleOptionGraphicsItem, opt: QtWidgets.QStyleOptionGraphicsItem,
w: QWidget w: QtWidgets.QWidget
) -> None: ) -> None:
''' '''
@ -427,9 +417,9 @@ class LevelLine(pg.InfiniteLine):
def add_marker( def add_marker(
self, self,
path: QGraphicsPathItem, path: QtWidgets.QGraphicsPathItem,
) -> QGraphicsPathItem: ) -> QtWidgets.QGraphicsPathItem:
self._marker = path self._marker = path
self._marker.setPen(self.currentPen) self._marker.setPen(self.currentPen)

View File

@ -20,14 +20,10 @@ to a chart from some other actor.
''' '''
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import asynccontextmanager as acm
asynccontextmanager as acm,
AsyncExitStack,
)
from functools import partial
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
# Any, Any,
AsyncContextManager, AsyncContextManager,
) )
@ -45,7 +41,6 @@ from PyQt5.QtWidgets import (
from piker.log import get_logger from piker.log import get_logger
from piker.types import Struct from piker.types import Struct
from piker.service import find_service from piker.service import find_service
from piker.brokers import SymbolNotFound
from ._display import DisplayState from ._display import DisplayState
from ._interaction import ChartView from ._interaction import ChartView
from ._editors import SelectRect from ._editors import SelectRect
@ -64,117 +59,11 @@ _dss: dict[str, DisplayState] | None = None
# be cancelled on shutdown/error. # be cancelled on shutdown/error.
# TODO: make `tractor.Context` hashable via is `.cid: str`? # TODO: make `tractor.Context` hashable via is `.cid: str`?
# _ctxs: set[Context] = set() # _ctxs: set[Context] = set()
# TODO: use type statements from 3.12+ _ctxs: list[Context] = []
IpcCtxTable = dict[
str, # each `Context.cid`
tuple[
Context, # handle for ctx-cancellation
set[int] # set of annotation (instance) ids
]
]
_ctxs: IpcCtxTable = {} # global map of all uniquely created annotation-graphics
# so that they can be mutated (eventually) by a client.
# XXX: global map of all uniquely created annotation-graphics so _annots: dict[int, QGraphicsItem] = {}
# that they can be mutated (eventually) by a client.
# NOTE: this map is only populated on the `chart` actor side (aka
# the "annotations server" which actually renders to a Qt canvas).
# type AnnotsTable = dict[int, QGraphicsItem]
AnnotsTable = dict[int, QGraphicsItem]
_annots: AnnotsTable = {}
async def serve_rc_annots(
ipc_key: str,
annot_req_stream: MsgStream,
dss: dict[str, DisplayState],
ctxs: IpcCtxTable,
annots: AnnotsTable,
) -> None:
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
aid: int = id(rect)
annots[aid] = rect
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case {
'rm_annot': int(aid),
}:
# NOTE: this is normally entered on
# a client's annotation de-alloc normally
# prior to detach or modify.
annot: QGraphicsItem = annots[aid]
annot.delete()
# respond to client indicating annot
# was indeed deleted.
await annot_req_stream.send(aid)
case {
'fqme': fqme,
'render': int(aid),
'viz_name': str(viz_name),
'timeframe': timeframe,
}:
# | {
# 'backfilling': (str(viz_name), timeframe),
# }:
ds: DisplayState = _dss[viz_name]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
@tractor.context @tractor.context
@ -185,29 +74,62 @@ async def remote_annotate(
global _dss, _ctxs global _dss, _ctxs
assert _dss assert _dss
_ctxs[ctx.cid] = (ctx, set()) _ctxs.append(ctx)
# send back full fqme symbology to caller # send back full fqme symbology to caller
await ctx.started(list(_dss)) await ctx.started(list(_dss))
# open annot request handler stream
async with ctx.open_stream() as annot_req_stream: async with ctx.open_stream() as annot_req_stream:
try: async for msg in annot_req_stream:
await serve_rc_annots( match msg:
ipc_key=ctx.cid, case {
annot_req_stream=annot_req_stream, 'annot': 'SelectRect',
dss=_dss, 'fqme': fqme,
ctxs=_ctxs, 'timeframe': timeframe,
annots=_annots, 'meth': str(meth),
) 'kwargs': dict(kwargs),
finally: }:
# ensure all annots for this connection are deleted
# on any final teardown ds: DisplayState = _dss[fqme]
(_ctx, aids) = _ctxs[ctx.cid] chart: ChartPlotWidget = {
assert _ctx is ctx 60: ds.hist_chart,
for aid in aids: 1: ds.chart,
annot: QGraphicsItem = _annots[aid] }[timeframe]
annot.delete() cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
await annot_req_stream.send(id(rect))
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
class AnnotCtl(Struct): class AnnotCtl(Struct):
@ -220,12 +142,7 @@ class AnnotCtl(Struct):
''' '''
ctx2fqmes: dict[str, str] ctx2fqmes: dict[str, str]
fqme2ipc: dict[str, MsgStream] fqme2stream: dict[str, MsgStream]
_annot_stack: AsyncExitStack
# runtime-populated mapping of all annotation
# ids to their equivalent IPC msg-streams.
_ipcs: dict[int, MsgStream] = {}
async def add_rect( async def add_rect(
self, self,
@ -238,21 +155,13 @@ class AnnotCtl(Struct):
domain: str = 'view', # or 'scene' domain: str = 'view', # or 'scene'
color: str = 'dad_blue', color: str = 'dad_blue',
from_acm: bool = False,
) -> int: ) -> int:
''' '''
Add a `SelectRect` annotation to the target view, return Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor. the instances `id(obj)` from the remote UI actor.
''' '''
ipc: MsgStream = self.fqme2ipc.get(fqme) ipc: MsgStream = self.fqme2stream[fqme]
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
await ipc.send({ await ipc.send({
'fqme': fqme, 'fqme': fqme,
'annot': 'SelectRect', 'annot': 'SelectRect',
@ -266,61 +175,32 @@ class AnnotCtl(Struct):
'update_label': False, 'update_label': False,
}, },
}) })
aid: int = await ipc.receive() return (await ipc.receive())
self._ipcs[aid] = ipc
if not from_acm: async def modify(
self._annot_stack.push_async_callback( self,
partial( aid: int, # annotation id
self.remove, meth: str, # far end graphics object method to invoke
aid, params: dict[str, Any], # far end `meth(**kwargs)`
) ) -> bool:
) '''
return aid Modify an existing (remote) annotation's graphics
paramters, thus changing it's appearance / state in real
time.
'''
raise NotImplementedError
async def remove( async def remove(
self, self,
aid: int, uid: int,
) -> bool: ) -> bool:
''' '''
Remove an existing annotation by instance id. Remove an existing annotation by instance id.
''' '''
ipc: MsgStream = self._ipcs[aid] raise NotImplementedError
await ipc.send({
'rm_annot': aid,
})
removed: bool = await ipc.receive()
return removed
@acm
async def open_rect(
self,
**kwargs,
) -> int:
try:
aid: int = await self.add_rect(
from_acm=True,
**kwargs,
)
yield aid
finally:
await self.remove(aid)
# TODO: do we even need this?
# async def modify(
# self,
# aid: int, # annotation id
# meth: str, # far end graphics object method to invoke
# params: dict[str, Any], # far end `meth(**kwargs)`
# ) -> bool:
# '''
# Modify an existing (remote) annotation's graphics
# paramters, thus changing it's appearance / state in real
# time.
# '''
# raise NotImplementedError
@acm @acm
@ -355,18 +235,20 @@ async def open_annot_ctl(
) )
ctx2fqmes: dict[str, set[str]] = {} ctx2fqmes: dict[str, set[str]] = {}
fqme2ipc: dict[str, MsgStream] = {} fqme2stream: dict[str, MsgStream] = {}
stream_ctxs: list[AsyncContextManager] = [] client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2stream=fqme2stream,
)
async with ( stream_ctxs: list[AsyncContextManager] = []
trionics.gather_contexts(ctx_mngrs) as ctxs, async with trionics.gather_contexts(ctx_mngrs) as ctxs:
):
for (ctx, fqmes) in ctxs: for (ctx, fqmes) in ctxs:
stream_ctxs.append(ctx.open_stream()) stream_ctxs.append(ctx.open_stream())
# fill lookup table of mkt addrs to IPC ctxs # fill lookup table of mkt addrs to IPC ctxs
for fqme in fqmes: for fqme in fqmes:
if other := fqme2ipc.get(fqme): if other := fqme2stream.get(fqme):
raise ValueError( raise ValueError(
f'More then one chart displays {fqme}!?\n' f'More then one chart displays {fqme}!?\n'
'Other UI actor info:\n' 'Other UI actor info:\n'
@ -384,19 +266,8 @@ async def open_annot_ctl(
for stream in streams: for stream in streams:
fqmes: set[str] = ctx2fqmes[stream._ctx.cid] fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
for fqme in fqmes: for fqme in fqmes:
fqme2ipc[fqme] = stream fqme2stream[fqme] = stream
# NOTE: on graceful teardown we always attempt to yield client
# remove all annots that were created by the # TODO: on graceful teardown should we try to
# entering client. # remove all annots that were created/modded?
# TODO: should we maybe instead/also do this on the
# server-actor side so that when a client
# disconnects we always delete all annotations by
# default instaead of expecting the client to?
async with AsyncExitStack() as annots_stack:
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2ipc=fqme2ipc,
_annot_stack=annots_stack,
)
yield client