Compare commits

..

No commits in common. "3d03781810707520a82278ea541f80c6178b7074" and "9be29a707d60095aadd72d87851f0644a46a8483" have entirely different histories.

17 changed files with 316 additions and 594 deletions

View File

@ -201,7 +201,6 @@ class FutesPair(Pair):
match contype:
case (
'CURRENT_QUARTER'
| 'CURRENT_QUARTER DELIVERING'
| 'NEXT_QUARTER' # su madre binance..
):
pair, _, expiry = symbol.partition('_')
@ -221,10 +220,6 @@ class FutesPair(Pair):
case ['DEFI']:
return 'PERP'
# wow, just wow you binance guys suck..
if self.status == 'PENDING_TRADING':
return 'PENDING'
# XXX: yeah no clue then..
raise ValueError(
f'Bad .expiry token match: {contype} for {symbol}'
@ -242,7 +237,6 @@ class FutesPair(Pair):
case (
'CURRENT_QUARTER'
| 'CURRENT_QUARTER DELIVERING'
| 'NEXT_QUARTER' # su madre binance..
):
_, _, expiry = symbol.partition('_')
@ -255,10 +249,7 @@ class FutesPair(Pair):
return f'{margin}M'
match subtype:
case (
['DEFI']
| ['USDC']
):
case ['DEFI']:
return f'{subtype[0]}'
# XXX: yeah no clue then..

View File

@ -482,7 +482,7 @@ def search(
):
return await func()
from piker.toolz import open_crash_handler
from cornerboi._debug import open_crash_handler
with open_crash_handler():
quotes = trio.run(
main,
@ -506,11 +506,9 @@ def search(
@click.option('--delete', '-d', flag_value=True, help='Delete section')
@click.pass_obj
def brokercfg(config, section, value, delete):
'''
If invoked with no arguments, open an editor to edit broker
configs file or get / update an individual section.
'''
"""If invoked with no arguments, open an editor to edit broker configs file
or get / update an individual section.
"""
from .. import config
if section:

View File

@ -434,8 +434,11 @@ async def get_bars(
# current impl) to detect a cancel case.
# timeout=timeout,
)
# usually either a request during a venue closure
# or into a large (weekend) closure gap.
# not enough bars signal, likely due to venue
# operational gaps.
# too_little: bool = False
if end_dt:
if not bars:
# no data returned?
log.warning(
@ -444,13 +447,14 @@ async def get_bars(
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
)
# NOTE: REQUIRED to pass back value..
result = None
return None
# raise NoData(
# f'{fqme}\n'
# f'end_dt:{end_dt}\n'
# )
# not enough bars signal, likely due to venue
# operational gaps.
if end_dt:
else:
dur_s: float = len(bars) * timeframe
bars_dur = Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()

View File

@ -70,18 +70,12 @@ _symbol_info_translation: dict[str, str] = {
def get_config() -> dict[str, Any]:
'''
Load our section from `piker/brokers.toml`.
'''
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
if (section := conf.get('kraken')) is None:
log.warning(
f'No config section found for kraken in {path}'
)
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
return {}
return section

View File

@ -26,7 +26,6 @@ from contextlib import asynccontextmanager as acm
from datetime import datetime
from operator import itemgetter
import itertools
from pprint import pformat
import time
from typing import (
Callable,
@ -698,12 +697,7 @@ async def open_trade_dialog(
# sanity check all the mkt infos
for fqme, flume in feed.flumes.items():
mkt: MktPair = symcache.mktmaps.get(fqme) or mkt_by_fqme[fqme]
if mkt != flume.mkt:
diff: tuple = mkt - flume.mkt
log.warning(
'MktPair sig mismatch?\n'
f'{pformat(diff)}'
)
assert mkt == flume.mkt
get_cost: Callable = getattr(
brokermod,

View File

@ -14,45 +14,49 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Actor runtime primtives and (distributed) service APIs for,
"""
Actor-runtime service orchestration machinery.
- daemon-service mgmt: `_daemon` (i.e. low-level spawn and supervise machinery
for sub-actors like `brokerd`, `emsd`, datad`, etc.)
"""
from __future__ import annotations
- service-actor supervision (via `trio` tasks) API: `._mngr`
- discovery interface (via light wrapping around `tractor`'s built-in
prot): `._registry`
- `docker` cntr SC supervision for use with `trio`: `_ahab`
- wrappers for marketstore and elasticsearch dbs
=> TODO: maybe to (re)move elsewhere?
'''
from ._mngr import Services as Services
from ._registry import (
_tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr,
_default_registry_host as _default_registry_host,
_default_registry_port as _default_registry_port,
open_registry as open_registry,
find_service as find_service,
check_for_service as check_for_service,
from ._mngr import Services
from ._registry import ( # noqa
_tractor_kwargs,
_default_reg_addr,
_default_registry_host,
_default_registry_port,
open_registry,
find_service,
check_for_service,
)
from ._daemon import (
maybe_spawn_daemon as maybe_spawn_daemon,
spawn_emsd as spawn_emsd,
maybe_open_emsd as maybe_open_emsd,
from ._daemon import ( # noqa
maybe_spawn_daemon,
spawn_emsd,
maybe_open_emsd,
)
from ._actor_runtime import (
open_piker_runtime as open_piker_runtime,
maybe_open_pikerd as maybe_open_pikerd,
open_pikerd as open_pikerd,
get_runtime_vars as get_runtime_vars,
open_piker_runtime,
maybe_open_pikerd,
open_pikerd,
get_runtime_vars,
)
from ..brokers._daemon import (
spawn_brokerd as spawn_brokerd,
maybe_spawn_brokerd as maybe_spawn_brokerd,
spawn_brokerd,
maybe_spawn_brokerd,
)
__all__ = [
'check_for_service',
'Services',
'maybe_spawn_daemon',
'spawn_brokerd',
'maybe_spawn_brokerd',
'spawn_emsd',
'maybe_open_emsd',
'open_piker_runtime',
'maybe_open_pikerd',
'open_pikerd',
'get_runtime_vars',
]

View File

@ -100,7 +100,7 @@ async def open_piker_runtime(
or [_default_reg_addr]
)
if ems := tractor_kwargs.pop('enable_modules', None):
if ems := tractor_kwargs.get('enable_modules'):
# import pdbp; pdbp.set_trace()
enable_modules.extend(ems)
@ -175,20 +175,14 @@ async def open_pikerd(
alive underling services (see below).
'''
# NOTE: for the root daemon we always enable the root
# mod set and we `list.extend()` it into wtv the
# caller requested.
# TODO: make this mod set more strict?
# -[ ] eventually we should be able to avoid
# having the root have more then permissions to spawn other
# specialized daemons I think?
ems: list[str] = kwargs.setdefault('enable_modules', [])
ems.extend(_root_modules)
async with (
open_piker_runtime(
name=_root_dname,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
loglevel=loglevel,
debug_mode=debug_mode,
registry_addrs=registry_addrs,

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Supervisor for ``docker`` with included async and SC wrapping to
ensure a cancellable container lifetime system.
Supervisor for ``docker`` with included async and SC wrapping
to ensure a cancellable container lifetime system.
'''
from __future__ import annotations

View File

@ -27,12 +27,6 @@ from typing import (
import trio
from trio_typing import TaskStatus
import tractor
from tractor import (
current_actor,
ContextCancelled,
Context,
Portal,
)
from ._util import (
log, # sub-sys logger
@ -44,8 +38,6 @@ from ._util import (
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
class Services:
actor_n: tractor._supervise.ActorNursery
@ -55,7 +47,7 @@ class Services:
str,
tuple[
trio.CancelScope,
Portal,
tractor.Portal,
trio.Event,
]
] = {}
@ -65,12 +57,12 @@ class Services:
async def start_service_task(
self,
name: str,
portal: Portal,
portal: tractor.Portal,
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, Context):
) -> (trio.CancelScope, tractor.Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
@ -109,30 +101,13 @@ class Services:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res: Any = await ctx.result()
ctx_res = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
finally:
await portal.cancel_actor()

View File

@ -27,7 +27,6 @@ from typing import (
)
import tractor
from tractor import Portal
from ._util import (
log, # sub-sys logger
@ -141,11 +140,7 @@ async def find_service(
first_only: bool = True,
) -> (
Portal
| list[Portal]
| None
):
) -> tractor.Portal | None:
reg_addrs: list[tuple[str, int]]
async with open_registry(
@ -158,9 +153,6 @@ async def find_service(
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,

View File

@ -24,13 +24,8 @@ from __future__ import annotations
# AsyncExitStack,
# )
from pathlib import Path
from math import copysign
import time
from types import ModuleType
from typing import (
Any,
TYPE_CHECKING,
)
import polars as pl
import numpy as np
@ -47,17 +42,15 @@ from piker.data import (
ShmArray,
)
from piker import tsp
from piker.data._formatters import BGM
from . import log
from . import (
log,
)
from . import (
__tsdbs__,
open_storage_client,
StorageClient,
)
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
store = typer.Typer()
@ -210,12 +203,10 @@ def anal(
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period,
)
) = tsp.dedupe(shm_df)
write_edits: bool = True
if (
@ -226,6 +217,7 @@ def anal(
)
):
await tractor.pause()
await client.write_ohlcv(
fqme,
ohlcv=deduped,
@ -237,122 +229,10 @@ def anal(
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
assert not null_segs
trio.run(main)
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
if prev_r.is_empty():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
rect_gap: float = BGM*3/8
opn: float = row['open'][0]
ro: tuple[float, float] = (
# dt_end_t,
iend + rect_gap + 1,
opn,
)
cls: float = prev_r['close'][0]
lc: tuple[float, float] = (
# dt_start_t,
istart - rect_gap, # + 1 ,
cls,
)
color: str = 'dad_blue'
diff: float = cls - opn
sgn: float = copysign(1, diff)
color: str = {
-1: 'buy_green',
1: 'sell_red',
}[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
)
aid: int = await actl.add_rect(**rect_kwargs)
assert aid
aids[aid] = rect_kwargs
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids
@store.command()
def ldshm(
fqme: str,
@ -369,6 +249,7 @@ def ldshm(
async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
actl: AnnotCtl
mod: ModuleType
@ -393,89 +274,111 @@ def ldshm(
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2])
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
period_s: float = float(times[-1] - times[-2])
if period_s < 1.:
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
period_s: float = float(max(d1, d2, med))
# over-write back to shm?
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(shm_df)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: call null-seg fixer somehow?
if null_segs:
await tractor.pause()
# async with (
# trio.open_nursery() as tn,
# mod.open_history_client(
# mkt,
# ) as (get_hist, config),
# ):
# nulls_detected: trio.Event = await tn.start(partial(
# tsp.maybe_fill_null_segments,
needs_correction: bool = (
not gaps.is_empty()
or null_segs
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if needs_correction:
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# shm=shm,
# timeframe=timeframe,
# get_hist=get_hist,
# sampler_stream=sampler_stream,
# mkt=mkt,
# ))
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# over-write back to shm?
wdts: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
wdts,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period_s,
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == iend - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
# async with actl.open_rect(
# ) as aid:
aid: int = await actl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
# TODO: by default we always want to mark these up
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
gap_thresh=1,
)
# TODO: find the disjoint set of step gaps from
# venue (closure) set!
# -[ ] do a set diff by checking for the unique
# gap set only in the step_gaps?
# write to parquet file?
if (
not venue_gaps.is_empty()
or (
period_s < 60
and not step_gaps.is_empty()
)
write_parquet
):
# write repaired ts to parquet-file?
if write_parquet:
start: float = time.time()
# write to fs
start = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
@ -487,7 +390,7 @@ def ldshm(
)
# read back from fs
start: float = time.time()
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
@ -516,32 +419,17 @@ def ldshm(
update_first=False, # don't update ._first
)
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await markup_gaps(
fqme,
period_s,
actl,
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
await tractor.pause()
assert aids
assert diff
else:
# allow interaction even when no ts problems.
await tractor.pause()
# assert not diff
assert not diff
if shm_df is None:
log.error(
f'No matching shm buffers for {fqme} ?'
)
if df is None:
log.error(f'No matching shm buffers for {fqme} ?')
trio.run(main)

View File

@ -18,12 +18,24 @@
Toolz for debug, profile and trace of the distributed runtime :surfer:
'''
from tractor.devx import (
open_crash_handler as open_crash_handler,
from .debug import (
open_crash_handler,
)
from .profile import (
Profiler as Profiler,
pg_profile_enabled as pg_profile_enabled,
ms_slower_then as ms_slower_then,
timeit as timeit,
Profiler,
pg_profile_enabled,
ms_slower_then,
timeit,
)
# TODO: other mods to include?
# - DROP .trionics, already moved into tractor
# - move in `piker.calc`
__all__: list[str] = [
'open_crash_handler',
'pg_profile_enabled',
'ms_slower_then',
'Profiler',
'timeit',
]

View File

@ -0,0 +1,40 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Debugger wrappers for `pdbp` as used by `tractor`.
'''
from contextlib import contextmanager as cm
import pdbp
# TODO: better naming and what additionals?
# - optional runtime plugging?
# - detection for sync vs. async code?
# - specialized REPL entry when in distributed mode?
@cm
def open_crash_handler():
'''
Super basic crash handler using `pdbp` debugger.
'''
try:
yield
except BaseException:
pdbp.xpm()
raise

View File

@ -51,11 +51,10 @@ from pendulum import (
import numpy as np
import polars as pl
from piker.brokers import NoData
from piker.accounting import (
from ..accounting import (
MktPair,
)
from piker.data._util import (
from ..data._util import (
log,
)
from ..data._sharedmem import (
@ -303,28 +302,16 @@ async def maybe_fill_null_segments(
gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward
cls: float = shm._array[istart]['close']
# TODO: how can we mark this range as being a gap tho?
# -[ ] maybe pg finally supports nulls in ndarray to
# show empty space somehow?
# -[ ] we could put a special value in the vlm or
# another col/field to denote?
gap[ohlc_fields] = cls
gap[ohlc_fields] = shm._array[istart]['close']
start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange(
start=start_t,
stop=start_t + t_diff,
step=timeframe,
)
# TODO: reimpl using the new `.ui._remote_ctl` ctx
# ideally using some kinda decent
# tractory-reverse-lookup-connnection from some other
# `Context` type thingy?
await sampler_stream.send({
'broadcast_all': {
@ -345,11 +332,11 @@ async def maybe_fill_null_segments(
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# await tractor.pause()
async def start_backfill(
get_hist,
frame_types: dict[str, Duration] | None,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -394,6 +381,7 @@ async def start_backfill(
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to
@ -427,28 +415,6 @@ async def start_backfill(
end_dt=last_start_dt,
)
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
)
continue
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
@ -905,7 +871,7 @@ async def tsdb_backfill(
):
log.info(
f'`{mod}` history client returned backfill config:\n'
f'{pformat(config)}\n'
f'{config}\n'
)
dt_eps: list[DateTime, DateTime] = []
@ -977,7 +943,6 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
frame_types=config.get('frame_types', None),
mod=mod,
mkt=mkt,
shm=shm,

View File

@ -21,16 +21,15 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types.
'''
from __future__ import annotations
from collections import UserList
from pprint import (
saferepr,
pformat,
)
from typing import Any
from msgspec import (
msgpack,
Struct as _Struct,
Struct,
structs,
)
@ -63,7 +62,7 @@ class DiffDump(UserList):
class Struct(
_Struct,
Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
@ -73,27 +72,9 @@ class Struct(
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
@ -109,72 +90,16 @@ class Struct(
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
# added as `@properties`!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
for fi in structs.fields(self):
key: str = fi.name
sin_props[key] = asdict[key]
return sin_props
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def pformat(self) -> str:
return f'Struct({pformat(self.to_dict())})'
def copy(
self,

View File

@ -50,16 +50,15 @@ from ._display import DisplayState
from ._interaction import ChartView
from ._editors import SelectRect
from ._chart import ChartPlotWidget
from ._dataviz import Viz
log = get_logger(__name__)
# NOTE: this is UPDATED by the `._display.graphics_update_loop()`
# once all chart widgets / Viz per flume have been initialized
# allowing for remote annotation (control) of any chart-actor's mkt
# feed by fqme lookup Bo
_dss: dict[str, DisplayState] = {}
# NOTE: this is set by the `._display.graphics_update_loop()` once
# all chart widgets / Viz per flume have been initialized allowing
# for remote annotation (control) of any chart-actor's mkt feed by
# fqme lookup Bo
_dss: dict[str, DisplayState] | None = None
# stash each and every client connection so that they can all
# be cancelled on shutdown/error.
@ -97,7 +96,7 @@ async def serve_rc_annots(
async for msg in annot_req_stream:
match msg:
case {
'cmd': 'SelectRect',
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
@ -111,6 +110,14 @@ async def serve_rc_annots(
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
@ -136,8 +143,7 @@ async def serve_rc_annots(
await annot_req_stream.send(aid)
case {
'cmd': 'remove',
'aid': int(aid),
'rm_annot': int(aid),
}:
# NOTE: this is normally entered on
# a client's annotation de-alloc normally
@ -150,29 +156,19 @@ async def serve_rc_annots(
await annot_req_stream.send(aid)
case {
'cmd': 'redraw',
'fqme': fqme,
'render': int(aid),
'viz_name': str(viz_name),
'timeframe': timeframe,
# TODO: maybe more fields?
# 'render': int(aid),
# 'viz_name': str(viz_name),
}:
# NOTE: old match from the 60s display loop task
# | {
# 'backfilling': (str(viz_name), timeframe),
# }:
ds: DisplayState = _dss[fqme]
viz: Viz = {
60: ds.hist_viz,
1: ds.viz,
ds: DisplayState = _dss[viz_name]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
log.warning(
f'Forcing VIZ REDRAW:\n'
f'fqme: {fqme}\n'
f'timeframe: {timeframe}\n'
)
viz.reset_graphics()
case _:
log.error(
@ -231,18 +227,6 @@ class AnnotCtl(Struct):
# ids to their equivalent IPC msg-streams.
_ipcs: dict[int, MsgStream] = {}
def _get_ipc(
self,
fqme: str,
) -> MsgStream:
ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
return ipc
async def add_rect(
self,
fqme: str,
@ -262,10 +246,16 @@ class AnnotCtl(Struct):
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self._get_ipc(fqme)
ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
await ipc.send({
'fqme': fqme,
'cmd': 'SelectRect',
'annot': 'SelectRect',
'timeframe': timeframe,
# 'meth': str(meth),
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
@ -298,8 +288,7 @@ class AnnotCtl(Struct):
'''
ipc: MsgStream = self._ipcs[aid]
await ipc.send({
'cmd': 'remove',
'aid': aid,
'rm_annot': aid,
})
removed: bool = await ipc.receive()
return removed
@ -318,19 +307,6 @@ class AnnotCtl(Struct):
finally:
await self.remove(aid)
async def redraw(
self,
fqme: str,
timeframe: float,
) -> None:
await self._get_ipc(fqme).send({
'cmd': 'redraw',
'fqme': fqme,
# 'render': int(aid),
# 'viz_name': str(viz_name),
'timeframe': timeframe,
})
# TODO: do we even need this?
# async def modify(
# self,

View File

@ -358,7 +358,7 @@ class OrderMode:
send_msg: bool = True,
order: Order | None = None,
) -> Dialog | None:
) -> Dialog:
'''
Send execution order to EMS return a level line to
represent the order on a chart.
@ -378,16 +378,6 @@ class OrderMode:
'oid': oid,
})
if order.price <= 0:
log.error(
'*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form
# like `ib_insync.contracts.Contract.__repr__()`
f'{order}\n'
)
self.cancel_orders([order.oid])
return None
lines = self.lines_from_order(
order,
show_markers=True,
@ -673,7 +663,7 @@ class OrderMode:
self,
msg: Status,
) -> Dialog | None:
) -> Dialog:
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
order = msg.req
@ -704,15 +694,12 @@ class OrderMode:
fqsn=fqme,
info={},
)
maybe_dialog: Dialog | None = self.submit_order(
dialog = self.submit_order(
send_msg=False,
order=order,
)
if maybe_dialog is None:
return None
assert self.dialogs[oid] == maybe_dialog
return maybe_dialog
assert self.dialogs[oid] == dialog
return dialog
@asynccontextmanager
@ -1092,24 +1079,7 @@ async def process_trade_msg(
)
):
msg.req = order
dialog: (
Dialog
# NOTE: on an invalid order submission (eg.
# price <=0) the downstream APIs may return
# a null.
| None
) = mode.load_unknown_dialog_from_msg(msg)
# cancel any invalid pre-existing order!
if dialog is None:
log.warning(
'Order was ignored/invalid?\n'
f'{order}'
)
# if valid, display the order line the same as if
# it was submitted during this UI session.
else:
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
case Status(resp='error'):