Compare commits
10 Commits
a681b2f0bb
...
ad565936ec
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | ad565936ec | |
Tyler Goodlet | d4b07cc95a | |
Tyler Goodlet | 1231c459aa | |
Tyler Goodlet | 88f415e5b8 | |
Tyler Goodlet | d9c574e291 | |
Tyler Goodlet | a86573b5a2 | |
Tyler Goodlet | 1d7e97a295 | |
Tyler Goodlet | bbb98597a0 | |
Tyler Goodlet | e33d6333ec | |
Tyler Goodlet | 263a5a8d07 |
|
@ -33,6 +33,11 @@ from typing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor import (
|
||||||
|
Context,
|
||||||
|
MsgStream,
|
||||||
|
Channel,
|
||||||
|
)
|
||||||
from tractor.trionics import (
|
from tractor.trionics import (
|
||||||
maybe_open_nursery,
|
maybe_open_nursery,
|
||||||
)
|
)
|
||||||
|
@ -53,7 +58,10 @@ if TYPE_CHECKING:
|
||||||
from ._sharedmem import (
|
from ._sharedmem import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
)
|
)
|
||||||
from .feed import _FeedsBus
|
from .feed import (
|
||||||
|
_FeedsBus,
|
||||||
|
Sub,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# highest frequency sample step is 1 second by default, though in
|
# highest frequency sample step is 1 second by default, though in
|
||||||
|
@ -94,7 +102,7 @@ class Sampler:
|
||||||
float,
|
float,
|
||||||
list[
|
list[
|
||||||
float,
|
float,
|
||||||
set[tractor.MsgStream]
|
set[MsgStream]
|
||||||
],
|
],
|
||||||
] = defaultdict(
|
] = defaultdict(
|
||||||
lambda: [
|
lambda: [
|
||||||
|
@ -258,8 +266,8 @@ class Sampler:
|
||||||
f'broadcasting {period_s} -> {last_ts}\n'
|
f'broadcasting {period_s} -> {last_ts}\n'
|
||||||
# f'consumers: {subs}'
|
# f'consumers: {subs}'
|
||||||
)
|
)
|
||||||
borked: set[tractor.MsgStream] = set()
|
borked: set[MsgStream] = set()
|
||||||
sent: set[tractor.MsgStream] = set()
|
sent: set[MsgStream] = set()
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
for stream in (subs - sent):
|
for stream in (subs - sent):
|
||||||
|
@ -314,7 +322,7 @@ class Sampler:
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def register_with_sampler(
|
async def register_with_sampler(
|
||||||
ctx: tractor.Context,
|
ctx: Context,
|
||||||
period_s: float,
|
period_s: float,
|
||||||
shms_by_period: dict[float, dict] | None = None,
|
shms_by_period: dict[float, dict] | None = None,
|
||||||
|
|
||||||
|
@ -649,12 +657,7 @@ async def sample_and_broadcast(
|
||||||
# eventually block this producer end of the feed and
|
# eventually block this producer end of the feed and
|
||||||
# thus other consumers still attached.
|
# thus other consumers still attached.
|
||||||
sub_key: str = broker_symbol.lower()
|
sub_key: str = broker_symbol.lower()
|
||||||
subs: list[
|
subs: set[Sub] = bus.get_subs(sub_key)
|
||||||
tuple[
|
|
||||||
tractor.MsgStream | trio.MemorySendChannel,
|
|
||||||
float | None, # tick throttle in Hz
|
|
||||||
]
|
|
||||||
] = bus.get_subs(sub_key)
|
|
||||||
|
|
||||||
# NOTE: by default the broker backend doesn't append
|
# NOTE: by default the broker backend doesn't append
|
||||||
# it's own "name" into the fqme schema (but maybe it
|
# it's own "name" into the fqme schema (but maybe it
|
||||||
|
@ -663,34 +666,40 @@ async def sample_and_broadcast(
|
||||||
fqme: str = f'{broker_symbol}.{brokername}'
|
fqme: str = f'{broker_symbol}.{brokername}'
|
||||||
lags: int = 0
|
lags: int = 0
|
||||||
|
|
||||||
# TODO: speed up this loop in an AOT compiled lang (like
|
# XXX TODO XXX: speed up this loop in an AOT compiled
|
||||||
# rust or nim or zig) and/or instead of doing a fan out to
|
# lang (like rust or nim or zig)!
|
||||||
# TCP sockets here, we add a shm-style tick queue which
|
# AND/OR instead of doing a fan out to TCP sockets
|
||||||
# readers can pull from instead of placing the burden of
|
# here, we add a shm-style tick queue which readers can
|
||||||
# broadcast on solely on this `brokerd` actor. see issues:
|
# pull from instead of placing the burden of broadcast
|
||||||
|
# on solely on this `brokerd` actor. see issues:
|
||||||
# - https://github.com/pikers/piker/issues/98
|
# - https://github.com/pikers/piker/issues/98
|
||||||
# - https://github.com/pikers/piker/issues/107
|
# - https://github.com/pikers/piker/issues/107
|
||||||
|
|
||||||
for (stream, tick_throttle) in subs.copy():
|
# for (stream, tick_throttle) in subs.copy():
|
||||||
|
for sub in subs.copy():
|
||||||
|
ipc: MsgStream = sub.ipc
|
||||||
|
throttle: float = sub.throttle_rate
|
||||||
try:
|
try:
|
||||||
with trio.move_on_after(0.2) as cs:
|
with trio.move_on_after(0.2) as cs:
|
||||||
if tick_throttle:
|
if throttle:
|
||||||
|
send_chan: trio.abc.SendChannel = sub.send_chan
|
||||||
|
|
||||||
# this is a send mem chan that likely
|
# this is a send mem chan that likely
|
||||||
# pushes to the ``uniform_rate_send()`` below.
|
# pushes to the ``uniform_rate_send()`` below.
|
||||||
try:
|
try:
|
||||||
stream.send_nowait(
|
send_chan.send_nowait(
|
||||||
(fqme, quote)
|
(fqme, quote)
|
||||||
)
|
)
|
||||||
except trio.WouldBlock:
|
except trio.WouldBlock:
|
||||||
overruns[sub_key] += 1
|
overruns[sub_key] += 1
|
||||||
ctx = stream._ctx
|
ctx: Context = ipc._ctx
|
||||||
chan = ctx.chan
|
chan: Channel = ctx.chan
|
||||||
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Feed OVERRUN {sub_key}'
|
f'Feed OVERRUN {sub_key}'
|
||||||
'@{bus.brokername} -> \n'
|
'@{bus.brokername} -> \n'
|
||||||
f'feed @ {chan.uid}\n'
|
f'feed @ {chan.uid}\n'
|
||||||
f'throttle = {tick_throttle} Hz'
|
f'throttle = {throttle} Hz'
|
||||||
)
|
)
|
||||||
|
|
||||||
if overruns[sub_key] > 6:
|
if overruns[sub_key] > 6:
|
||||||
|
@ -707,10 +716,10 @@ async def sample_and_broadcast(
|
||||||
f'{sub_key}:'
|
f'{sub_key}:'
|
||||||
f'{ctx.cid}@{chan.uid}'
|
f'{ctx.cid}@{chan.uid}'
|
||||||
)
|
)
|
||||||
await stream.aclose()
|
await ipc.aclose()
|
||||||
raise trio.BrokenResourceError
|
raise trio.BrokenResourceError
|
||||||
else:
|
else:
|
||||||
await stream.send(
|
await ipc.send(
|
||||||
{fqme: quote}
|
{fqme: quote}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -724,16 +733,16 @@ async def sample_and_broadcast(
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
trio.EndOfChannel,
|
trio.EndOfChannel,
|
||||||
):
|
):
|
||||||
ctx = stream._ctx
|
ctx: Context = ipc._ctx
|
||||||
chan = ctx.chan
|
chan: Channel = ctx.chan
|
||||||
if ctx:
|
if ctx:
|
||||||
log.warning(
|
log.warning(
|
||||||
'Dropped `brokerd`-quotes-feed connection:\n'
|
'Dropped `brokerd`-quotes-feed connection:\n'
|
||||||
f'{broker_symbol}:'
|
f'{broker_symbol}:'
|
||||||
f'{ctx.cid}@{chan.uid}'
|
f'{ctx.cid}@{chan.uid}'
|
||||||
)
|
)
|
||||||
if tick_throttle:
|
if sub.throttle_rate:
|
||||||
assert stream._closed
|
assert ipc._closed
|
||||||
|
|
||||||
# XXX: do we need to deregister here
|
# XXX: do we need to deregister here
|
||||||
# if it's done in the fee bus code?
|
# if it's done in the fee bus code?
|
||||||
|
@ -742,7 +751,7 @@ async def sample_and_broadcast(
|
||||||
# since there seems to be some kinda race..
|
# since there seems to be some kinda race..
|
||||||
bus.remove_subs(
|
bus.remove_subs(
|
||||||
sub_key,
|
sub_key,
|
||||||
{(stream, tick_throttle)},
|
{sub},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -750,7 +759,7 @@ async def uniform_rate_send(
|
||||||
|
|
||||||
rate: float,
|
rate: float,
|
||||||
quote_stream: trio.abc.ReceiveChannel,
|
quote_stream: trio.abc.ReceiveChannel,
|
||||||
stream: tractor.MsgStream,
|
stream: MsgStream,
|
||||||
|
|
||||||
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ module.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import (
|
from collections import (
|
||||||
defaultdict,
|
defaultdict,
|
||||||
|
abc,
|
||||||
)
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
@ -36,7 +37,6 @@ from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
AsyncContextManager,
|
AsyncContextManager,
|
||||||
Optional,
|
|
||||||
Awaitable,
|
Awaitable,
|
||||||
Sequence,
|
Sequence,
|
||||||
)
|
)
|
||||||
|
@ -76,6 +76,31 @@ from ._sampling import (
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Sub(Struct, frozen=True):
|
||||||
|
'''
|
||||||
|
A live feed subscription entry.
|
||||||
|
|
||||||
|
Contains meta-data on the remote-actor type (in functionality
|
||||||
|
terms) as well as refs to IPC streams and sampler runtime
|
||||||
|
params.
|
||||||
|
|
||||||
|
'''
|
||||||
|
ipc: tractor.MsgStream
|
||||||
|
send_chan: trio.abc.SendChannel | None = None
|
||||||
|
|
||||||
|
# tick throttle rate in Hz; determines how live
|
||||||
|
# quotes/ticks should be downsampled before relay
|
||||||
|
# to the receiving remote consumer (process).
|
||||||
|
throttle_rate: float | None = None
|
||||||
|
_throttle_cs: trio.CancelScope | None = None
|
||||||
|
|
||||||
|
# TODO: actually stash comms info for the far end to allow
|
||||||
|
# `.tsp`, `.fsp` and `.data._sampling` sub-systems to re-render
|
||||||
|
# the data view as needed via msging with the `._remote_ctl`
|
||||||
|
# ipc ctx.
|
||||||
|
rc_ui: bool = False
|
||||||
|
|
||||||
|
|
||||||
class _FeedsBus(Struct):
|
class _FeedsBus(Struct):
|
||||||
'''
|
'''
|
||||||
Data feeds broadcaster and persistence management.
|
Data feeds broadcaster and persistence management.
|
||||||
|
@ -100,13 +125,7 @@ class _FeedsBus(Struct):
|
||||||
|
|
||||||
_subscribers: defaultdict[
|
_subscribers: defaultdict[
|
||||||
str,
|
str,
|
||||||
set[
|
set[Sub]
|
||||||
tuple[
|
|
||||||
tractor.MsgStream | trio.MemorySendChannel,
|
|
||||||
# tractor.Context,
|
|
||||||
float | None, # tick throttle in Hz
|
|
||||||
]
|
|
||||||
]
|
|
||||||
] = defaultdict(set)
|
] = defaultdict(set)
|
||||||
|
|
||||||
async def start_task(
|
async def start_task(
|
||||||
|
@ -140,31 +159,28 @@ class _FeedsBus(Struct):
|
||||||
def get_subs(
|
def get_subs(
|
||||||
self,
|
self,
|
||||||
key: str,
|
key: str,
|
||||||
) -> set[
|
|
||||||
tuple[
|
) -> set[Sub]:
|
||||||
tractor.MsgStream | trio.MemorySendChannel,
|
|
||||||
float | None, # tick throttle in Hz
|
|
||||||
]
|
|
||||||
]:
|
|
||||||
'''
|
'''
|
||||||
Get the ``set`` of consumer subscription entries for the given key.
|
Get the ``set`` of consumer subscription entries for the given key.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return self._subscribers[key]
|
return self._subscribers[key]
|
||||||
|
|
||||||
|
def subs_items(self) -> abc.ItemsView[str, set[Sub]]:
|
||||||
|
return self._subscribers.items()
|
||||||
|
|
||||||
def add_subs(
|
def add_subs(
|
||||||
self,
|
self,
|
||||||
key: str,
|
key: str,
|
||||||
subs: set[tuple[
|
subs: set[Sub],
|
||||||
tractor.MsgStream | trio.MemorySendChannel,
|
|
||||||
float | None, # tick throttle in Hz
|
) -> set[Sub]:
|
||||||
]],
|
|
||||||
) -> set[tuple]:
|
|
||||||
'''
|
'''
|
||||||
Add a ``set`` of consumer subscription entries for the given key.
|
Add a ``set`` of consumer subscription entries for the given key.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
_subs: set[tuple] = self._subscribers[key]
|
_subs: set[Sub] = self._subscribers.setdefault(key, set())
|
||||||
_subs.update(subs)
|
_subs.update(subs)
|
||||||
return _subs
|
return _subs
|
||||||
|
|
||||||
|
@ -441,8 +457,9 @@ async def open_feed_bus(
|
||||||
symbols: list[str], # normally expected to the broker-specific fqme
|
symbols: list[str], # normally expected to the broker-specific fqme
|
||||||
|
|
||||||
loglevel: str = 'error',
|
loglevel: str = 'error',
|
||||||
tick_throttle: Optional[float] = None,
|
tick_throttle: float | None = None,
|
||||||
start_stream: bool = True,
|
start_stream: bool = True,
|
||||||
|
allow_remote_ctl_ui: bool = False,
|
||||||
|
|
||||||
) -> dict[
|
) -> dict[
|
||||||
str, # fqme
|
str, # fqme
|
||||||
|
@ -519,10 +536,10 @@ async def open_feed_bus(
|
||||||
# pack for ``.started()`` sync msg
|
# pack for ``.started()`` sync msg
|
||||||
flumes[fqme] = flume
|
flumes[fqme] = flume
|
||||||
|
|
||||||
# we use the broker-specific fqme (bs_fqme) for the
|
# we use the broker-specific fqme (bs_fqme) for the sampler
|
||||||
# sampler subscription since the backend isn't (yet) expected to
|
# subscription since the backend isn't (yet) expected to
|
||||||
# append it's own name to the fqme, so we filter on keys which
|
# append it's own name to the fqme, so we filter on keys
|
||||||
# *do not* include that name (e.g .ib) .
|
# which *do not* include that name (e.g .ib) .
|
||||||
bus._subscribers.setdefault(bs_fqme, set())
|
bus._subscribers.setdefault(bs_fqme, set())
|
||||||
|
|
||||||
# sync feed subscribers with flume handles
|
# sync feed subscribers with flume handles
|
||||||
|
@ -561,49 +578,60 @@ async def open_feed_bus(
|
||||||
# that the ``sample_and_broadcast()`` task (spawned inside
|
# that the ``sample_and_broadcast()`` task (spawned inside
|
||||||
# ``allocate_persistent_feed()``) will push real-time quote
|
# ``allocate_persistent_feed()``) will push real-time quote
|
||||||
# (ticks) to this new consumer.
|
# (ticks) to this new consumer.
|
||||||
|
cs: trio.CancelScope | None = None
|
||||||
|
send: trio.MemorySendChannel | None = None
|
||||||
if tick_throttle:
|
if tick_throttle:
|
||||||
flume.throttle_rate = tick_throttle
|
flume.throttle_rate = tick_throttle
|
||||||
|
|
||||||
# open a bg task which receives quotes over a mem chan
|
# open a bg task which receives quotes over a mem
|
||||||
# and only pushes them to the target actor-consumer at
|
# chan and only pushes them to the target
|
||||||
# a max ``tick_throttle`` instantaneous rate.
|
# actor-consumer at a max ``tick_throttle``
|
||||||
|
# (instantaneous) rate.
|
||||||
send, recv = trio.open_memory_channel(2**10)
|
send, recv = trio.open_memory_channel(2**10)
|
||||||
|
|
||||||
cs = await bus.start_task(
|
# NOTE: the ``.send`` channel here is a swapped-in
|
||||||
|
# trio mem chan which gets `.send()`-ed by the normal
|
||||||
|
# sampler task but instead of being sent directly
|
||||||
|
# over the IPC msg stream it's the throttle task
|
||||||
|
# does the work of incrementally forwarding to the
|
||||||
|
# IPC stream at the throttle rate.
|
||||||
|
cs: trio.CancelScope = await bus.start_task(
|
||||||
uniform_rate_send,
|
uniform_rate_send,
|
||||||
tick_throttle,
|
tick_throttle,
|
||||||
recv,
|
recv,
|
||||||
stream,
|
stream,
|
||||||
)
|
)
|
||||||
# NOTE: so the ``send`` channel here is actually a swapped
|
|
||||||
# in trio mem chan which gets pushed by the normal sampler
|
|
||||||
# task but instead of being sent directly over the IPC msg
|
|
||||||
# stream it's the throttle task does the work of
|
|
||||||
# incrementally forwarding to the IPC stream at the throttle
|
|
||||||
# rate.
|
|
||||||
send._ctx = ctx # mock internal ``tractor.MsgStream`` ref
|
|
||||||
sub = (send, tick_throttle)
|
|
||||||
|
|
||||||
else:
|
sub = Sub(
|
||||||
sub = (stream, tick_throttle)
|
ipc=stream,
|
||||||
|
send_chan=send,
|
||||||
|
throttle_rate=tick_throttle,
|
||||||
|
_throttle_cs=cs,
|
||||||
|
rc_ui=allow_remote_ctl_ui,
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: add an api for this on the bus?
|
# TODO: add an api for this on the bus?
|
||||||
# maybe use the current task-id to key the sub list that's
|
# maybe use the current task-id to key the sub list that's
|
||||||
# added / removed? Or maybe we can add a general
|
# added / removed? Or maybe we can add a general
|
||||||
# pause-resume by sub-key api?
|
# pause-resume by sub-key api?
|
||||||
bs_fqme = fqme.removesuffix(f'.{brokername}')
|
bs_fqme = fqme.removesuffix(f'.{brokername}')
|
||||||
local_subs.setdefault(bs_fqme, set()).add(sub)
|
local_subs.setdefault(
|
||||||
bus.add_subs(bs_fqme, {sub})
|
bs_fqme,
|
||||||
|
set()
|
||||||
|
).add(sub)
|
||||||
|
bus.add_subs(
|
||||||
|
bs_fqme,
|
||||||
|
{sub}
|
||||||
|
)
|
||||||
|
|
||||||
# sync caller with all subs registered state
|
# sync caller with all subs registered state
|
||||||
sub_registered.set()
|
sub_registered.set()
|
||||||
|
|
||||||
uid = ctx.chan.uid
|
uid: tuple[str, str] = ctx.chan.uid
|
||||||
try:
|
try:
|
||||||
# ctrl protocol for start/stop of quote streams based on UI
|
# ctrl protocol for start/stop of live quote streams
|
||||||
# state (eg. don't need a stream when a symbol isn't being
|
# based on UI state (eg. don't need a stream when
|
||||||
# displayed).
|
# a symbol isn't being displayed).
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
|
||||||
if msg == 'pause':
|
if msg == 'pause':
|
||||||
|
@ -760,7 +788,7 @@ async def install_brokerd_search(
|
||||||
async def maybe_open_feed(
|
async def maybe_open_feed(
|
||||||
|
|
||||||
fqmes: list[str],
|
fqmes: list[str],
|
||||||
loglevel: Optional[str] = None,
|
loglevel: str | None = None,
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
|
@ -820,6 +848,8 @@ async def open_feed(
|
||||||
start_stream: bool = True,
|
start_stream: bool = True,
|
||||||
tick_throttle: float | None = None, # Hz
|
tick_throttle: float | None = None, # Hz
|
||||||
|
|
||||||
|
allow_remote_ctl_ui: bool = False,
|
||||||
|
|
||||||
) -> Feed:
|
) -> Feed:
|
||||||
'''
|
'''
|
||||||
Open a "data feed" which provides streamed real-time quotes.
|
Open a "data feed" which provides streamed real-time quotes.
|
||||||
|
@ -902,6 +932,12 @@ async def open_feed(
|
||||||
# of these stream open sequences sequentially per
|
# of these stream open sequences sequentially per
|
||||||
# backend? .. need some thot!
|
# backend? .. need some thot!
|
||||||
allow_overruns=True,
|
allow_overruns=True,
|
||||||
|
|
||||||
|
# NOTE: UI actors (like charts) can allow
|
||||||
|
# remote control of certain graphics rendering
|
||||||
|
# capabilities via the
|
||||||
|
# `.ui._remote_ctl.remote_annotate()` msg loop.
|
||||||
|
allow_remote_ctl_ui=allow_remote_ctl_ui,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,12 @@ Storage middle-ware CLIs.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
# from datetime import datetime
|
# from datetime import datetime
|
||||||
|
# from contextlib import (
|
||||||
|
# AsyncExitStack,
|
||||||
|
# )
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import time
|
import time
|
||||||
|
from types import ModuleType
|
||||||
|
|
||||||
import polars as pl
|
import polars as pl
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
@ -34,7 +38,6 @@ import typer
|
||||||
|
|
||||||
from piker.service import open_piker_runtime
|
from piker.service import open_piker_runtime
|
||||||
from piker.cli import cli
|
from piker.cli import cli
|
||||||
from piker.config import get_conf_dir
|
|
||||||
from piker.data import (
|
from piker.data import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
)
|
)
|
||||||
|
@ -45,6 +48,7 @@ from . import (
|
||||||
from . import (
|
from . import (
|
||||||
__tsdbs__,
|
__tsdbs__,
|
||||||
open_storage_client,
|
open_storage_client,
|
||||||
|
StorageClient,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -232,7 +236,8 @@ def anal(
|
||||||
@store.command()
|
@store.command()
|
||||||
def ldshm(
|
def ldshm(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
write_parquet: bool = False,
|
write_parquet: bool = True,
|
||||||
|
reload_parquet_to_shm: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -242,15 +247,32 @@ def ldshm(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
from piker.ui._remote_ctl import (
|
||||||
|
open_annot_ctl,
|
||||||
|
AnnotCtl,
|
||||||
|
)
|
||||||
|
actl: AnnotCtl
|
||||||
|
mod: ModuleType
|
||||||
|
client: StorageClient
|
||||||
async with (
|
async with (
|
||||||
open_piker_runtime(
|
open_piker_runtime(
|
||||||
'polars_boi',
|
'polars_boi',
|
||||||
enable_modules=['piker.data._sharedmem'],
|
enable_modules=['piker.data._sharedmem'],
|
||||||
debug_mode=True,
|
debug_mode=True,
|
||||||
),
|
),
|
||||||
|
open_storage_client() as (
|
||||||
|
mod,
|
||||||
|
client,
|
||||||
|
),
|
||||||
|
open_annot_ctl() as actl,
|
||||||
):
|
):
|
||||||
df: pl.DataFrame | None = None
|
shm_df: pl.DataFrame | None = None
|
||||||
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
|
for (
|
||||||
|
shmfile,
|
||||||
|
shm,
|
||||||
|
# parquet_path,
|
||||||
|
shm_df,
|
||||||
|
) in tsp.iter_dfs_from_shms(fqme):
|
||||||
|
|
||||||
# compute ohlc properties for naming
|
# compute ohlc properties for naming
|
||||||
times: np.ndarray = shm.array['time']
|
times: np.ndarray = shm.array['time']
|
||||||
|
@ -275,122 +297,136 @@ def ldshm(
|
||||||
period=period_s,
|
period=period_s,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: maybe only optionally enter this depending
|
needs_correction: bool = (
|
||||||
# on some CLI flags and/or gap detection?
|
|
||||||
if (
|
|
||||||
not gaps.is_empty()
|
not gaps.is_empty()
|
||||||
or null_segs
|
or null_segs
|
||||||
):
|
)
|
||||||
from piker.ui._remote_ctl import (
|
# TODO: maybe only optionally enter this depending
|
||||||
open_annot_ctl,
|
# on some CLI flags and/or gap detection?
|
||||||
AnnotCtl,
|
if needs_correction:
|
||||||
)
|
for i in range(gaps.height):
|
||||||
annot_ctl: AnnotCtl
|
row: pl.DataFrame = gaps[i]
|
||||||
async with open_annot_ctl() as annot_ctl:
|
|
||||||
for i in range(gaps.height):
|
|
||||||
|
|
||||||
row: pl.DataFrame = gaps[i]
|
# TODO: can we eventually remove this
|
||||||
|
# once we figure out why the epoch cols
|
||||||
|
# don't match?
|
||||||
|
iend: int = row['index'][0]
|
||||||
|
# dt: datetime = row['dt'][0]
|
||||||
|
# dt_prev: datetime = row['dt_prev'][0]
|
||||||
|
|
||||||
# TODO: can we eventually remove this
|
# the gap's right-most bar's OPEN value
|
||||||
# once we figure out why the epoch cols
|
# at that time (sample) step.
|
||||||
# don't match?
|
# dt_end_t: float = dt.timestamp()
|
||||||
iend: int = row['index'][0]
|
|
||||||
# dt: datetime = row['dt'][0]
|
|
||||||
# dt_prev: datetime = row['dt_prev'][0]
|
|
||||||
|
|
||||||
# the gap's right-most bar's OPEN value
|
# TODO: FIX HOW/WHY these aren't matching
|
||||||
# at that time (sample) step.
|
# and are instead off by 4hours (EST
|
||||||
# dt_end_t: float = dt.timestamp()
|
# vs. UTC?!?!)
|
||||||
|
# end_t: float = row['time']
|
||||||
|
# assert (
|
||||||
|
# dt.timestamp()
|
||||||
|
# ==
|
||||||
|
# end_t
|
||||||
|
# )
|
||||||
|
|
||||||
# TODO: FIX HOW/WHY these aren't matching
|
# the gap's left-most bar's CLOSE value
|
||||||
# and are instead off by 4hours (EST
|
# at that time (sample) step.
|
||||||
# vs. UTC?!?!)
|
prev_r: pl.DataFrame = df.filter(
|
||||||
# end_t: float = row['time']
|
pl.col('index') == iend - 1
|
||||||
# assert (
|
)
|
||||||
# dt.timestamp()
|
istart: int = prev_r['index'][0]
|
||||||
# ==
|
# dt_start_t: float = dt_prev.timestamp()
|
||||||
# end_t
|
|
||||||
# )
|
|
||||||
|
|
||||||
# the gap's left-most bar's CLOSE value
|
# start_t: float = prev_r['time']
|
||||||
# at that time (sample) step.
|
# assert (
|
||||||
|
# dt_start_t
|
||||||
|
# ==
|
||||||
|
# start_t
|
||||||
|
# )
|
||||||
|
|
||||||
prev_r: pl.DataFrame = df.filter(
|
# TODO: implement px-col width measure
|
||||||
pl.col('index') == gaps[0]['index'] - 1
|
# and ensure at least as many px-cols
|
||||||
|
# shown per rect as configured by user.
|
||||||
|
gap_w: float = abs((iend - istart))
|
||||||
|
if gap_w < 6:
|
||||||
|
margin: float = 6
|
||||||
|
iend += margin
|
||||||
|
istart -= margin
|
||||||
|
|
||||||
|
ro: tuple[float, float] = (
|
||||||
|
# dt_end_t,
|
||||||
|
iend,
|
||||||
|
row['open'][0],
|
||||||
|
)
|
||||||
|
lc: tuple[float, float] = (
|
||||||
|
# dt_start_t,
|
||||||
|
istart,
|
||||||
|
prev_r['close'][0],
|
||||||
|
)
|
||||||
|
|
||||||
|
# async with actl.open_rect(
|
||||||
|
# ) as aid:
|
||||||
|
aid: int = await actl.add_rect(
|
||||||
|
fqme=fqme,
|
||||||
|
timeframe=period_s,
|
||||||
|
start_pos=lc,
|
||||||
|
end_pos=ro,
|
||||||
|
)
|
||||||
|
assert aid
|
||||||
|
|
||||||
|
# write to parquet file?
|
||||||
|
if (
|
||||||
|
write_parquet
|
||||||
|
):
|
||||||
|
# write to fs
|
||||||
|
start = time.time()
|
||||||
|
path: Path = await client.write_ohlcv(
|
||||||
|
fqme,
|
||||||
|
ohlcv=deduped,
|
||||||
|
timeframe=period_s,
|
||||||
|
)
|
||||||
|
write_delay: float = round(
|
||||||
|
time.time() - start,
|
||||||
|
ndigits=6,
|
||||||
|
)
|
||||||
|
|
||||||
|
# read back from fs
|
||||||
|
start = time.time()
|
||||||
|
read_df: pl.DataFrame = pl.read_parquet(path)
|
||||||
|
read_delay: float = round(
|
||||||
|
time.time() - start,
|
||||||
|
ndigits=6,
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
f'parquet write took {write_delay} secs\n'
|
||||||
|
f'file path: {path}'
|
||||||
|
f'parquet read took {read_delay} secs\n'
|
||||||
|
f'polars df: {read_df}'
|
||||||
|
)
|
||||||
|
|
||||||
|
if reload_parquet_to_shm:
|
||||||
|
new = tsp.pl2np(
|
||||||
|
deduped,
|
||||||
|
dtype=shm.array.dtype,
|
||||||
)
|
)
|
||||||
istart: int = prev_r['index'][0]
|
# since normally readonly
|
||||||
# dt_start_t: float = dt_prev.timestamp()
|
shm._array.setflags(
|
||||||
|
write=int(1),
|
||||||
# start_t: float = prev_r['time']
|
|
||||||
# assert (
|
|
||||||
# dt_start_t
|
|
||||||
# ==
|
|
||||||
# start_t
|
|
||||||
# )
|
|
||||||
|
|
||||||
# TODO: implement px-col width measure
|
|
||||||
# and ensure at least as many px-cols
|
|
||||||
# shown per rect as configured by user.
|
|
||||||
gap_w: float = abs((iend - istart))
|
|
||||||
# await tractor.pause()
|
|
||||||
if gap_w < 6:
|
|
||||||
margin: float = 6
|
|
||||||
iend += margin
|
|
||||||
istart -= margin
|
|
||||||
|
|
||||||
ro: tuple[float, float] = (
|
|
||||||
# dt_end_t,
|
|
||||||
iend,
|
|
||||||
row['open'][0],
|
|
||||||
)
|
)
|
||||||
lc: tuple[float, float] = (
|
shm.push(
|
||||||
# dt_start_t,
|
new,
|
||||||
istart,
|
prepend=True,
|
||||||
prev_r['close'][0],
|
start=new['index'][-1],
|
||||||
|
update_first=False, # don't update ._first
|
||||||
)
|
)
|
||||||
|
|
||||||
aid: int = await annot_ctl.add_rect(
|
await tractor.pause()
|
||||||
fqme=fqme,
|
assert diff
|
||||||
timeframe=period_s,
|
|
||||||
start_pos=lc,
|
|
||||||
end_pos=ro,
|
|
||||||
)
|
|
||||||
assert aid
|
|
||||||
await tractor.pause()
|
|
||||||
|
|
||||||
# write to parquet file?
|
else:
|
||||||
if write_parquet:
|
# allow interaction even when no ts problems.
|
||||||
timeframe: str = f'{period_s}s'
|
await tractor.pause()
|
||||||
|
assert not diff
|
||||||
|
|
||||||
datadir: Path = get_conf_dir() / 'nativedb'
|
|
||||||
if not datadir.is_dir():
|
|
||||||
datadir.mkdir()
|
|
||||||
|
|
||||||
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
|
|
||||||
|
|
||||||
# write to fs
|
|
||||||
start = time.time()
|
|
||||||
df.write_parquet(path)
|
|
||||||
delay: float = round(
|
|
||||||
time.time() - start,
|
|
||||||
ndigits=6,
|
|
||||||
)
|
|
||||||
log.info(
|
|
||||||
f'parquet write took {delay} secs\n'
|
|
||||||
f'file path: {path}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# read back from fs
|
|
||||||
start = time.time()
|
|
||||||
read_df: pl.DataFrame = pl.read_parquet(path)
|
|
||||||
delay: float = round(
|
|
||||||
time.time() - start,
|
|
||||||
ndigits=6,
|
|
||||||
)
|
|
||||||
print(
|
|
||||||
f'parquet read took {delay} secs\n'
|
|
||||||
f'polars df: {read_df}'
|
|
||||||
)
|
|
||||||
|
|
||||||
if df is None:
|
if df is None:
|
||||||
log.error(f'No matching shm buffers for {fqme} ?')
|
log.error(f'No matching shm buffers for {fqme} ?')
|
||||||
|
|
|
@ -95,16 +95,19 @@ def detect_period(shm: ShmArray) -> float:
|
||||||
|
|
||||||
def mk_ohlcv_shm_keyed_filepath(
|
def mk_ohlcv_shm_keyed_filepath(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
period: float, # ow known as the "timeframe"
|
period: float | int, # ow known as the "timeframe"
|
||||||
datadir: Path,
|
datadir: Path,
|
||||||
|
|
||||||
) -> str:
|
) -> Path:
|
||||||
|
|
||||||
if period < 1.:
|
if period < 1.:
|
||||||
raise ValueError('Sample period should be >= 1.!?')
|
raise ValueError('Sample period should be >= 1.!?')
|
||||||
|
|
||||||
period_s: str = f'{period}s'
|
path: Path = (
|
||||||
path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet'
|
datadir
|
||||||
|
/
|
||||||
|
f'{fqme}.ohlcv{int(period)}s.parquet'
|
||||||
|
)
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
@ -227,6 +230,7 @@ class NativeStorageClient:
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
period: float,
|
period: float,
|
||||||
|
|
||||||
) -> Path:
|
) -> Path:
|
||||||
return mk_ohlcv_shm_keyed_filepath(
|
return mk_ohlcv_shm_keyed_filepath(
|
||||||
fqme=fqme,
|
fqme=fqme,
|
||||||
|
@ -239,6 +243,7 @@ class NativeStorageClient:
|
||||||
fqme: str,
|
fqme: str,
|
||||||
df: pl.DataFrame,
|
df: pl.DataFrame,
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# cache df for later usage since we (currently) need to
|
# cache df for later usage since we (currently) need to
|
||||||
# convert to np.ndarrays to push to our `ShmArray` rt
|
# convert to np.ndarrays to push to our `ShmArray` rt
|
||||||
|
|
|
@ -120,7 +120,7 @@ from ..storage import TimeseriesNotFound
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
from ..service.marketstore import StorageClient
|
from ..service.marketstore import StorageClient
|
||||||
from .feed import _FeedsBus
|
# from .feed import _FeedsBus
|
||||||
|
|
||||||
|
|
||||||
# `ShmArray` buffer sizing configuration:
|
# `ShmArray` buffer sizing configuration:
|
||||||
|
@ -1352,9 +1352,7 @@ def iter_dfs_from_shms(
|
||||||
readonly=True,
|
readonly=True,
|
||||||
)
|
)
|
||||||
assert not opened
|
assert not opened
|
||||||
ohlcv = shm.array
|
ohlcv: np.ndarray = shm.array
|
||||||
|
|
||||||
from ._anal import np2pl
|
|
||||||
df: pl.DataFrame = np2pl(ohlcv)
|
df: pl.DataFrame = np2pl(ohlcv)
|
||||||
|
|
||||||
yield (
|
yield (
|
||||||
|
|
|
@ -620,7 +620,11 @@ def detect_price_gaps(
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
def dedupe(src_df: pl.DataFrame) -> tuple[
|
def dedupe(
|
||||||
|
src_df: pl.DataFrame,
|
||||||
|
sort: bool = True,
|
||||||
|
|
||||||
|
) -> tuple[
|
||||||
pl.DataFrame, # with dts
|
pl.DataFrame, # with dts
|
||||||
pl.DataFrame, # gaps
|
pl.DataFrame, # gaps
|
||||||
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
||||||
|
@ -634,6 +638,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||||
|
|
||||||
'''
|
'''
|
||||||
df: pl.DataFrame = with_dts(src_df)
|
df: pl.DataFrame = with_dts(src_df)
|
||||||
|
|
||||||
|
# TODO: enable passing existing `with_dts` df for speedup?
|
||||||
gaps: pl.DataFrame = detect_time_gaps(df)
|
gaps: pl.DataFrame = detect_time_gaps(df)
|
||||||
|
|
||||||
# if no gaps detected just return carbon copies
|
# if no gaps detected just return carbon copies
|
||||||
|
@ -651,8 +657,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||||
subset=['dt'],
|
subset=['dt'],
|
||||||
maintain_order=True,
|
maintain_order=True,
|
||||||
)
|
)
|
||||||
|
if sort:
|
||||||
|
deduped = deduped.sort(by='time')
|
||||||
|
|
||||||
deduped_gaps = detect_time_gaps(deduped)
|
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
|
||||||
|
|
||||||
diff: int = (
|
diff: int = (
|
||||||
df.height
|
df.height
|
||||||
|
@ -660,7 +668,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||||
deduped.height
|
deduped.height
|
||||||
)
|
)
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Gaps found:\n{gaps}\n'
|
f'TIME GAPs FOUND:\n'
|
||||||
|
# f'{gaps}\n'
|
||||||
f'deduped Gaps found:\n{deduped_gaps}'
|
f'deduped Gaps found:\n{deduped_gaps}'
|
||||||
)
|
)
|
||||||
return (
|
return (
|
||||||
|
|
|
@ -471,6 +471,9 @@ async def graphics_update_loop(
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# XXX TODO: we need to do _dss UPDATE here so that when
|
||||||
|
# a feed-view is switched you can still remote annotate the
|
||||||
|
# prior view..
|
||||||
from . import _remote_ctl
|
from . import _remote_ctl
|
||||||
_remote_ctl._dss = dss
|
_remote_ctl._dss = dss
|
||||||
|
|
||||||
|
@ -526,7 +529,7 @@ async def graphics_update_loop(
|
||||||
finally:
|
finally:
|
||||||
# XXX: cancel any remote annotation control ctxs
|
# XXX: cancel any remote annotation control ctxs
|
||||||
_remote_ctl._dss = None
|
_remote_ctl._dss = None
|
||||||
for ctx in _remote_ctl._ctxs:
|
for cid, (ctx, aids) in _remote_ctl._ctxs.items():
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -314,7 +314,6 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
color.setAlpha(66)
|
color.setAlpha(66)
|
||||||
self.setBrush(fn.mkBrush(color))
|
self.setBrush(fn.mkBrush(color))
|
||||||
self.setZValue(1e9)
|
self.setZValue(1e9)
|
||||||
self.hide()
|
|
||||||
|
|
||||||
label = self._label = QLabel()
|
label = self._label = QLabel()
|
||||||
label.setTextFormat(0) # markdown
|
label.setTextFormat(0) # markdown
|
||||||
|
@ -343,6 +342,7 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
]
|
]
|
||||||
|
|
||||||
self.add_to_view(viewbox)
|
self.add_to_view(viewbox)
|
||||||
|
self.hide()
|
||||||
|
|
||||||
def add_to_view(
|
def add_to_view(
|
||||||
self,
|
self,
|
||||||
|
@ -579,10 +579,34 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
)
|
)
|
||||||
label.show()
|
label.show()
|
||||||
|
|
||||||
def clear(self):
|
def hide(self):
|
||||||
'''
|
'''
|
||||||
Clear the selection box from view.
|
Clear the selection box from its graphics scene but
|
||||||
|
don't delete it permanently.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
super().hide()
|
||||||
self._label.hide()
|
self._label.hide()
|
||||||
self.hide()
|
|
||||||
|
# TODO: ensure noone else using dis.
|
||||||
|
clear = hide
|
||||||
|
|
||||||
|
def delete(self) -> None:
|
||||||
|
'''
|
||||||
|
De-allocate this rect from its rendering graphics scene.
|
||||||
|
|
||||||
|
Like a permanent hide.
|
||||||
|
|
||||||
|
'''
|
||||||
|
scen: QGraphicsScene = self.scene()
|
||||||
|
if scen is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
scen.removeItem(self)
|
||||||
|
if (
|
||||||
|
self._label
|
||||||
|
and
|
||||||
|
self._label_proxy
|
||||||
|
|
||||||
|
):
|
||||||
|
scen.removeItem(self._label_proxy)
|
||||||
|
|
|
@ -28,7 +28,17 @@ from typing import (
|
||||||
|
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
from pyqtgraph import Point, functions as fn
|
from pyqtgraph import Point, functions as fn
|
||||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
from PyQt5 import (
|
||||||
|
QtCore,
|
||||||
|
QtGui,
|
||||||
|
)
|
||||||
|
from PyQt5.QtWidgets import (
|
||||||
|
QGraphicsPathItem,
|
||||||
|
QStyleOptionGraphicsItem,
|
||||||
|
QGraphicsItem,
|
||||||
|
QGraphicsScene,
|
||||||
|
QWidget,
|
||||||
|
)
|
||||||
from PyQt5.QtCore import QPointF
|
from PyQt5.QtCore import QPointF
|
||||||
|
|
||||||
from ._annotate import LevelMarker
|
from ._annotate import LevelMarker
|
||||||
|
@ -130,7 +140,7 @@ class LevelLine(pg.InfiniteLine):
|
||||||
self._right_end_sc: float = 0
|
self._right_end_sc: float = 0
|
||||||
|
|
||||||
# use px caching
|
# use px caching
|
||||||
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
|
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
|
||||||
|
|
||||||
def txt_offsets(self) -> tuple[int, int]:
|
def txt_offsets(self) -> tuple[int, int]:
|
||||||
return 0, 0
|
return 0, 0
|
||||||
|
@ -308,7 +318,7 @@ class LevelLine(pg.InfiniteLine):
|
||||||
Remove this line from containing chart/view/scene.
|
Remove this line from containing chart/view/scene.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
scene = self.scene()
|
scene: QGraphicsScene = self.scene()
|
||||||
if scene:
|
if scene:
|
||||||
for label in self._labels:
|
for label in self._labels:
|
||||||
label.delete()
|
label.delete()
|
||||||
|
@ -339,8 +349,8 @@ class LevelLine(pg.InfiniteLine):
|
||||||
self,
|
self,
|
||||||
|
|
||||||
p: QtGui.QPainter,
|
p: QtGui.QPainter,
|
||||||
opt: QtWidgets.QStyleOptionGraphicsItem,
|
opt: QStyleOptionGraphicsItem,
|
||||||
w: QtWidgets.QWidget
|
w: QWidget
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -417,9 +427,9 @@ class LevelLine(pg.InfiniteLine):
|
||||||
|
|
||||||
def add_marker(
|
def add_marker(
|
||||||
self,
|
self,
|
||||||
path: QtWidgets.QGraphicsPathItem,
|
path: QGraphicsPathItem,
|
||||||
|
|
||||||
) -> QtWidgets.QGraphicsPathItem:
|
) -> QGraphicsPathItem:
|
||||||
|
|
||||||
self._marker = path
|
self._marker = path
|
||||||
self._marker.setPen(self.currentPen)
|
self._marker.setPen(self.currentPen)
|
||||||
|
|
|
@ -20,10 +20,14 @@ to a chart from some other actor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
AsyncExitStack,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
# Any,
|
||||||
AsyncContextManager,
|
AsyncContextManager,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -41,6 +45,7 @@ from PyQt5.QtWidgets import (
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
from piker.service import find_service
|
from piker.service import find_service
|
||||||
|
from piker.brokers import SymbolNotFound
|
||||||
from ._display import DisplayState
|
from ._display import DisplayState
|
||||||
from ._interaction import ChartView
|
from ._interaction import ChartView
|
||||||
from ._editors import SelectRect
|
from ._editors import SelectRect
|
||||||
|
@ -59,11 +64,117 @@ _dss: dict[str, DisplayState] | None = None
|
||||||
# be cancelled on shutdown/error.
|
# be cancelled on shutdown/error.
|
||||||
# TODO: make `tractor.Context` hashable via is `.cid: str`?
|
# TODO: make `tractor.Context` hashable via is `.cid: str`?
|
||||||
# _ctxs: set[Context] = set()
|
# _ctxs: set[Context] = set()
|
||||||
_ctxs: list[Context] = []
|
# TODO: use type statements from 3.12+
|
||||||
|
IpcCtxTable = dict[
|
||||||
|
str, # each `Context.cid`
|
||||||
|
tuple[
|
||||||
|
Context, # handle for ctx-cancellation
|
||||||
|
set[int] # set of annotation (instance) ids
|
||||||
|
]
|
||||||
|
]
|
||||||
|
|
||||||
# global map of all uniquely created annotation-graphics
|
_ctxs: IpcCtxTable = {}
|
||||||
# so that they can be mutated (eventually) by a client.
|
|
||||||
_annots: dict[int, QGraphicsItem] = {}
|
# XXX: global map of all uniquely created annotation-graphics so
|
||||||
|
# that they can be mutated (eventually) by a client.
|
||||||
|
# NOTE: this map is only populated on the `chart` actor side (aka
|
||||||
|
# the "annotations server" which actually renders to a Qt canvas).
|
||||||
|
# type AnnotsTable = dict[int, QGraphicsItem]
|
||||||
|
AnnotsTable = dict[int, QGraphicsItem]
|
||||||
|
|
||||||
|
_annots: AnnotsTable = {}
|
||||||
|
|
||||||
|
|
||||||
|
async def serve_rc_annots(
|
||||||
|
ipc_key: str,
|
||||||
|
annot_req_stream: MsgStream,
|
||||||
|
dss: dict[str, DisplayState],
|
||||||
|
ctxs: IpcCtxTable,
|
||||||
|
annots: AnnotsTable,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
async for msg in annot_req_stream:
|
||||||
|
match msg:
|
||||||
|
case {
|
||||||
|
'annot': 'SelectRect',
|
||||||
|
'fqme': fqme,
|
||||||
|
'timeframe': timeframe,
|
||||||
|
'meth': str(meth),
|
||||||
|
'kwargs': dict(kwargs),
|
||||||
|
}:
|
||||||
|
|
||||||
|
ds: DisplayState = _dss[fqme]
|
||||||
|
chart: ChartPlotWidget = {
|
||||||
|
60: ds.hist_chart,
|
||||||
|
1: ds.chart,
|
||||||
|
}[timeframe]
|
||||||
|
cv: ChartView = chart.cv
|
||||||
|
|
||||||
|
# sanity
|
||||||
|
if timeframe == 60:
|
||||||
|
assert (
|
||||||
|
chart.linked.godwidget.hist_linked.chart.view
|
||||||
|
is
|
||||||
|
cv
|
||||||
|
)
|
||||||
|
|
||||||
|
# annot type lookup from cmd
|
||||||
|
rect = SelectRect(
|
||||||
|
viewbox=cv,
|
||||||
|
|
||||||
|
# TODO: make this more dynamic?
|
||||||
|
# -[ ] pull from conf.toml?
|
||||||
|
# -[ ] add `.set_color()` method to type?
|
||||||
|
# -[ ] make a green/red based on direction
|
||||||
|
# instead of default static color?
|
||||||
|
color=kwargs.pop('color', None),
|
||||||
|
)
|
||||||
|
# XXX NOTE: this is REQUIRED to set the rect
|
||||||
|
# resize callback!
|
||||||
|
rect.chart: ChartPlotWidget = chart
|
||||||
|
|
||||||
|
# delegate generically to the requested method
|
||||||
|
getattr(rect, meth)(**kwargs)
|
||||||
|
rect.show()
|
||||||
|
aid: int = id(rect)
|
||||||
|
annots[aid] = rect
|
||||||
|
aids: set[int] = ctxs[ipc_key][1]
|
||||||
|
aids.add(aid)
|
||||||
|
await annot_req_stream.send(aid)
|
||||||
|
|
||||||
|
case {
|
||||||
|
'rm_annot': int(aid),
|
||||||
|
}:
|
||||||
|
# NOTE: this is normally entered on
|
||||||
|
# a client's annotation de-alloc normally
|
||||||
|
# prior to detach or modify.
|
||||||
|
annot: QGraphicsItem = annots[aid]
|
||||||
|
annot.delete()
|
||||||
|
|
||||||
|
# respond to client indicating annot
|
||||||
|
# was indeed deleted.
|
||||||
|
await annot_req_stream.send(aid)
|
||||||
|
|
||||||
|
case {
|
||||||
|
'fqme': fqme,
|
||||||
|
'render': int(aid),
|
||||||
|
'viz_name': str(viz_name),
|
||||||
|
'timeframe': timeframe,
|
||||||
|
}:
|
||||||
|
# | {
|
||||||
|
# 'backfilling': (str(viz_name), timeframe),
|
||||||
|
# }:
|
||||||
|
ds: DisplayState = _dss[viz_name]
|
||||||
|
chart: ChartPlotWidget = {
|
||||||
|
60: ds.hist_chart,
|
||||||
|
1: ds.chart,
|
||||||
|
}[timeframe]
|
||||||
|
|
||||||
|
case _:
|
||||||
|
log.error(
|
||||||
|
'Unknown remote annotation cmd:\n'
|
||||||
|
f'{pformat(msg)}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
@ -74,62 +185,29 @@ async def remote_annotate(
|
||||||
global _dss, _ctxs
|
global _dss, _ctxs
|
||||||
assert _dss
|
assert _dss
|
||||||
|
|
||||||
_ctxs.append(ctx)
|
_ctxs[ctx.cid] = (ctx, set())
|
||||||
|
|
||||||
# send back full fqme symbology to caller
|
# send back full fqme symbology to caller
|
||||||
await ctx.started(list(_dss))
|
await ctx.started(list(_dss))
|
||||||
|
|
||||||
|
# open annot request handler stream
|
||||||
async with ctx.open_stream() as annot_req_stream:
|
async with ctx.open_stream() as annot_req_stream:
|
||||||
async for msg in annot_req_stream:
|
try:
|
||||||
match msg:
|
await serve_rc_annots(
|
||||||
case {
|
ipc_key=ctx.cid,
|
||||||
'annot': 'SelectRect',
|
annot_req_stream=annot_req_stream,
|
||||||
'fqme': fqme,
|
dss=_dss,
|
||||||
'timeframe': timeframe,
|
ctxs=_ctxs,
|
||||||
'meth': str(meth),
|
annots=_annots,
|
||||||
'kwargs': dict(kwargs),
|
)
|
||||||
}:
|
finally:
|
||||||
|
# ensure all annots for this connection are deleted
|
||||||
ds: DisplayState = _dss[fqme]
|
# on any final teardown
|
||||||
chart: ChartPlotWidget = {
|
(_ctx, aids) = _ctxs[ctx.cid]
|
||||||
60: ds.hist_chart,
|
assert _ctx is ctx
|
||||||
1: ds.chart,
|
for aid in aids:
|
||||||
}[timeframe]
|
annot: QGraphicsItem = _annots[aid]
|
||||||
cv: ChartView = chart.cv
|
annot.delete()
|
||||||
|
|
||||||
# sanity
|
|
||||||
if timeframe == 60:
|
|
||||||
assert (
|
|
||||||
chart.linked.godwidget.hist_linked.chart.view
|
|
||||||
is
|
|
||||||
cv
|
|
||||||
)
|
|
||||||
|
|
||||||
# annot type lookup from cmd
|
|
||||||
rect = SelectRect(
|
|
||||||
viewbox=cv,
|
|
||||||
|
|
||||||
# TODO: make this more dynamic?
|
|
||||||
# -[ ] pull from conf.toml?
|
|
||||||
# -[ ] add `.set_color()` method to type?
|
|
||||||
# -[ ] make a green/red based on direction
|
|
||||||
# instead of default static color?
|
|
||||||
color=kwargs.pop('color', None),
|
|
||||||
)
|
|
||||||
# XXX NOTE: this is REQUIRED to set the rect
|
|
||||||
# resize callback!
|
|
||||||
rect.chart: ChartPlotWidget = chart
|
|
||||||
|
|
||||||
# delegate generically to the requested method
|
|
||||||
getattr(rect, meth)(**kwargs)
|
|
||||||
rect.show()
|
|
||||||
await annot_req_stream.send(id(rect))
|
|
||||||
|
|
||||||
case _:
|
|
||||||
log.error(
|
|
||||||
'Unknown remote annotation cmd:\n'
|
|
||||||
f'{pformat(msg)}'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class AnnotCtl(Struct):
|
class AnnotCtl(Struct):
|
||||||
|
@ -142,7 +220,12 @@ class AnnotCtl(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
ctx2fqmes: dict[str, str]
|
ctx2fqmes: dict[str, str]
|
||||||
fqme2stream: dict[str, MsgStream]
|
fqme2ipc: dict[str, MsgStream]
|
||||||
|
_annot_stack: AsyncExitStack
|
||||||
|
|
||||||
|
# runtime-populated mapping of all annotation
|
||||||
|
# ids to their equivalent IPC msg-streams.
|
||||||
|
_ipcs: dict[int, MsgStream] = {}
|
||||||
|
|
||||||
async def add_rect(
|
async def add_rect(
|
||||||
self,
|
self,
|
||||||
|
@ -155,13 +238,21 @@ class AnnotCtl(Struct):
|
||||||
domain: str = 'view', # or 'scene'
|
domain: str = 'view', # or 'scene'
|
||||||
color: str = 'dad_blue',
|
color: str = 'dad_blue',
|
||||||
|
|
||||||
|
from_acm: bool = False,
|
||||||
|
|
||||||
) -> int:
|
) -> int:
|
||||||
'''
|
'''
|
||||||
Add a `SelectRect` annotation to the target view, return
|
Add a `SelectRect` annotation to the target view, return
|
||||||
the instances `id(obj)` from the remote UI actor.
|
the instances `id(obj)` from the remote UI actor.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
ipc: MsgStream = self.fqme2stream[fqme]
|
ipc: MsgStream = self.fqme2ipc.get(fqme)
|
||||||
|
if ipc is None:
|
||||||
|
raise SymbolNotFound(
|
||||||
|
'No chart (actor) seems to have mkt feed loaded?\n'
|
||||||
|
f'{fqme}'
|
||||||
|
)
|
||||||
|
|
||||||
await ipc.send({
|
await ipc.send({
|
||||||
'fqme': fqme,
|
'fqme': fqme,
|
||||||
'annot': 'SelectRect',
|
'annot': 'SelectRect',
|
||||||
|
@ -175,32 +266,61 @@ class AnnotCtl(Struct):
|
||||||
'update_label': False,
|
'update_label': False,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
return (await ipc.receive())
|
aid: int = await ipc.receive()
|
||||||
|
self._ipcs[aid] = ipc
|
||||||
async def modify(
|
if not from_acm:
|
||||||
self,
|
self._annot_stack.push_async_callback(
|
||||||
aid: int, # annotation id
|
partial(
|
||||||
meth: str, # far end graphics object method to invoke
|
self.remove,
|
||||||
params: dict[str, Any], # far end `meth(**kwargs)`
|
aid,
|
||||||
) -> bool:
|
)
|
||||||
'''
|
)
|
||||||
Modify an existing (remote) annotation's graphics
|
return aid
|
||||||
paramters, thus changing it's appearance / state in real
|
|
||||||
time.
|
|
||||||
|
|
||||||
'''
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
async def remove(
|
async def remove(
|
||||||
self,
|
self,
|
||||||
uid: int,
|
aid: int,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
Remove an existing annotation by instance id.
|
Remove an existing annotation by instance id.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
raise NotImplementedError
|
ipc: MsgStream = self._ipcs[aid]
|
||||||
|
await ipc.send({
|
||||||
|
'rm_annot': aid,
|
||||||
|
})
|
||||||
|
removed: bool = await ipc.receive()
|
||||||
|
return removed
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_rect(
|
||||||
|
self,
|
||||||
|
**kwargs,
|
||||||
|
) -> int:
|
||||||
|
try:
|
||||||
|
aid: int = await self.add_rect(
|
||||||
|
from_acm=True,
|
||||||
|
**kwargs,
|
||||||
|
)
|
||||||
|
yield aid
|
||||||
|
finally:
|
||||||
|
await self.remove(aid)
|
||||||
|
|
||||||
|
# TODO: do we even need this?
|
||||||
|
# async def modify(
|
||||||
|
# self,
|
||||||
|
# aid: int, # annotation id
|
||||||
|
# meth: str, # far end graphics object method to invoke
|
||||||
|
# params: dict[str, Any], # far end `meth(**kwargs)`
|
||||||
|
# ) -> bool:
|
||||||
|
# '''
|
||||||
|
# Modify an existing (remote) annotation's graphics
|
||||||
|
# paramters, thus changing it's appearance / state in real
|
||||||
|
# time.
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
@ -235,20 +355,18 @@ async def open_annot_ctl(
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx2fqmes: dict[str, set[str]] = {}
|
ctx2fqmes: dict[str, set[str]] = {}
|
||||||
fqme2stream: dict[str, MsgStream] = {}
|
fqme2ipc: dict[str, MsgStream] = {}
|
||||||
client = AnnotCtl(
|
|
||||||
ctx2fqmes=ctx2fqmes,
|
|
||||||
fqme2stream=fqme2stream,
|
|
||||||
)
|
|
||||||
|
|
||||||
stream_ctxs: list[AsyncContextManager] = []
|
stream_ctxs: list[AsyncContextManager] = []
|
||||||
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
|
|
||||||
|
async with (
|
||||||
|
trionics.gather_contexts(ctx_mngrs) as ctxs,
|
||||||
|
):
|
||||||
for (ctx, fqmes) in ctxs:
|
for (ctx, fqmes) in ctxs:
|
||||||
stream_ctxs.append(ctx.open_stream())
|
stream_ctxs.append(ctx.open_stream())
|
||||||
|
|
||||||
# fill lookup table of mkt addrs to IPC ctxs
|
# fill lookup table of mkt addrs to IPC ctxs
|
||||||
for fqme in fqmes:
|
for fqme in fqmes:
|
||||||
if other := fqme2stream.get(fqme):
|
if other := fqme2ipc.get(fqme):
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'More then one chart displays {fqme}!?\n'
|
f'More then one chart displays {fqme}!?\n'
|
||||||
'Other UI actor info:\n'
|
'Other UI actor info:\n'
|
||||||
|
@ -266,8 +384,19 @@ async def open_annot_ctl(
|
||||||
for stream in streams:
|
for stream in streams:
|
||||||
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
|
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
|
||||||
for fqme in fqmes:
|
for fqme in fqmes:
|
||||||
fqme2stream[fqme] = stream
|
fqme2ipc[fqme] = stream
|
||||||
|
|
||||||
yield client
|
# NOTE: on graceful teardown we always attempt to
|
||||||
# TODO: on graceful teardown should we try to
|
# remove all annots that were created by the
|
||||||
# remove all annots that were created/modded?
|
# entering client.
|
||||||
|
# TODO: should we maybe instead/also do this on the
|
||||||
|
# server-actor side so that when a client
|
||||||
|
# disconnects we always delete all annotations by
|
||||||
|
# default instaead of expecting the client to?
|
||||||
|
async with AsyncExitStack() as annots_stack:
|
||||||
|
client = AnnotCtl(
|
||||||
|
ctx2fqmes=ctx2fqmes,
|
||||||
|
fqme2ipc=fqme2ipc,
|
||||||
|
_annot_stack=annots_stack,
|
||||||
|
)
|
||||||
|
yield client
|
||||||
|
|
Loading…
Reference in New Issue