Compare commits

...

14 Commits

Author SHA1 Message Date
Tyler Goodlet 3d03781810 Impl a sane (with nesting) `.types.Struct.pformat()`
Such that our internal structs can be pretty printed with indented and
type-hinted fields, AND for nested `Struct`-fields call `.pformat()` but
avoiding any recursion errors using `pprint.saferepr()`. Add
a `._sin_props()` iterator over the non-property fields; use it for
`dict` casting when called with `.to_dict(include_non_members=False)`.

Actually, we should also probably figure out how to only pprint like
when required by the user in a REPL or log msg by context-selectively
`pprint.PrettyPrinter` right? Also, if we can generalize decently enough
it'd be cool to maybe patch this in as a util to upstream `msgspec`?
2024-01-17 15:50:27 -05:00
Tyler Goodlet 83d1f117a8 Always cancel (loaded) zero-priced orders
Ran into this trading a peenee where a dark got (mistakenly) submitted
at a price of 0 and then that consecutively broke upstream allocator/pp
code due to a divide-by-zero.. So instead always check for a zero-price
(since that should never ever be valid in any market) and instead cancel
any such order in the EMS and return `None` so that upstream callers can
ignore it without crash handling.
2024-01-17 10:29:43 -05:00
Tyler Goodlet e4ce79f720 Delegate `.toolz.open_crash_handler()` to `tractor.devx`
Means we can drop `.toolz.debug` module outright.
2024-01-16 10:26:38 -05:00
Tyler Goodlet 264246d89b Fix `brokers.toml` load for `kraken` backend 2024-01-10 17:53:15 -05:00
Tyler Goodlet 7c96c9fafe Just warn log on mismatched `MktPair` in paper eng 2024-01-10 17:52:50 -05:00
Tyler Goodlet 52b349fe79 Always reload shm data before annotating gaps, so they line up.. 2024-01-09 15:55:16 -05:00
Tyler Goodlet 6959429af8 Factor gap annotating into new `markup_gaps()`
Since we definitely want to markup gaps that are both data-errors and
just plain old venue closures (time gaps), generalize the `gaps:
pl.DataFrame` loop in a func and call it from the `ldshm` cmd for now.

Some other tweaks to `store ldshm`:
- add `np.median()` failover when detecting (ohlc) ts sample rate.
- use new `tsp.dedupe()` signature.
- differentiate between sample-period size gaps and venue closure gaps
  by calling `tsp.detect_time_gaps()` with diff size thresholds.
- add todo for backfilling null-segs when detected.
2024-01-04 11:01:21 -05:00
Tyler Goodlet 05f874001a Ignore `ContextCancelled`s from non-mngr requests
Since service daemon actors may be cancelled remotely by clients (who
maybe also requested said daemon-actor's spawn in the first place) we
specifically ignore `tractor.ContextCancelled`s from the `ctx.wait()`
inside `Services.start_service_task()` to avoid crashing the service
mngr, and thus for now `pikerd`, (which **does** happen now due to
updated and more explicit remote cancellation semantics implemented in
`tractor`) since the `.canceller` field is not going to match the
`pikerd` uid in such cases!

This explicit check makes sense since the `Services` mngr is built to
allow remote requests to "spawn-n-supervise service actors" where the
services can remain persistent but also cancelled later as requested. We
may want to consider only allowing cancellation by actors who requested
spawn in the future tho?

Also change to more explicit imports to `tractor` types for annots
throughout the sub-pkg.
2024-01-04 10:06:42 -05:00
Tyler Goodlet fc216d37de Drop `__all__` import style from `.services` 2024-01-04 10:05:53 -05:00
Tyler Goodlet 03e429abf8 Extend `enable_modules` from input `tractor_kwargs`
Since certain actors (even if client-like) will want to augment their
module set to provide remote features (such as our new rc annotation
msg-prot for `Qt`-chart actors) we need to ensure we merge in any input
`enable_modules: list[str]` to the value passed to the underlying
`tractor` spawning api. Previously we were passing `_root_modules` as
this value by name, but now instead we simply `list.extend()` that into
whatever is in the `kwargs` both in `open_piker_runtime()` and
`open_pikerd()`.
2024-01-04 09:59:15 -05:00
Tyler Goodlet 7ae7cc829f `tsp`: on backfill, do a smart retry on a `NoData`
Presuming the data provider gives us a config with a `frame_types: dict`
(indicating frame sizes per query/request) we try to be clever and
decrement our submitted `end_dt: DateTime` based on it.. hoping for the
best on the next attempt.
2024-01-03 19:49:41 -05:00
Tyler Goodlet b23d44e21a ib; return `None` on empty bars frame resp so as to trigger raising `NoData` in the caller 2024-01-03 18:16:48 -05:00
Tyler Goodlet 2669db785c Workaround `binance`'s latest API schema bs..
Apparently publishing futures contracts that aren't yet trading AND
changing their contract type `str` format/schema was necessary (such
that's there's a f@#$in space in it..)?

I honestly have no idea where they found their "data engineers" XD

TO CHERRY to #520
2024-01-03 17:50:09 -05:00
Tyler Goodlet d3e7b5cd0e Formalize rc `redraw()` msg-endpoint
So now a chart rc client can ask to invoke the new
`Viz.reset_graphics()` by timeframe and fqme Bo This handy when doing
underlying (real time or tsp) edits and you want to make the UI reflect
the changes incrementally.

Impl deatz:
- tweak the msg schema to use a `cmd:  str` which normally maps to
  (something similar to) the UI method name instead of `annot` and now
  offer 3 such "commands": 'redraw', 'remove', 'SelectRect'.
- impl `AnnotCtl.redraw()` which sends the underlying `msg: dict` on the
  correct `tractor.Msgstream` ipc instance.
  - since ipc-stream lookups now happen in multiple client methods impl
    a private `._get_ipc()` to do the error raise on unknown fqmes.
2024-01-03 17:33:15 -05:00
17 changed files with 594 additions and 316 deletions

View File

@ -201,6 +201,7 @@ class FutesPair(Pair):
match contype:
case (
'CURRENT_QUARTER'
| 'CURRENT_QUARTER DELIVERING'
| 'NEXT_QUARTER' # su madre binance..
):
pair, _, expiry = symbol.partition('_')
@ -220,6 +221,10 @@ class FutesPair(Pair):
case ['DEFI']:
return 'PERP'
# wow, just wow you binance guys suck..
if self.status == 'PENDING_TRADING':
return 'PENDING'
# XXX: yeah no clue then..
raise ValueError(
f'Bad .expiry token match: {contype} for {symbol}'
@ -237,6 +242,7 @@ class FutesPair(Pair):
case (
'CURRENT_QUARTER'
| 'CURRENT_QUARTER DELIVERING'
| 'NEXT_QUARTER' # su madre binance..
):
_, _, expiry = symbol.partition('_')
@ -249,7 +255,10 @@ class FutesPair(Pair):
return f'{margin}M'
match subtype:
case ['DEFI']:
case (
['DEFI']
| ['USDC']
):
return f'{subtype[0]}'
# XXX: yeah no clue then..

View File

@ -482,7 +482,7 @@ def search(
):
return await func()
from cornerboi._debug import open_crash_handler
from piker.toolz import open_crash_handler
with open_crash_handler():
quotes = trio.run(
main,
@ -506,9 +506,11 @@ def search(
@click.option('--delete', '-d', flag_value=True, help='Delete section')
@click.pass_obj
def brokercfg(config, section, value, delete):
"""If invoked with no arguments, open an editor to edit broker configs file
or get / update an individual section.
"""
'''
If invoked with no arguments, open an editor to edit broker
configs file or get / update an individual section.
'''
from .. import config
if section:

View File

@ -434,57 +434,53 @@ async def get_bars(
# current impl) to detect a cancel case.
# timeout=timeout,
)
# usually either a request during a venue closure
# or into a large (weekend) closure gap.
if not bars:
# no data returned?
log.warning(
'History frame is blank?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
)
# NOTE: REQUIRED to pass back value..
result = None
return None
# not enough bars signal, likely due to venue
# operational gaps.
# too_little: bool = False
if end_dt:
if not bars:
# no data returned?
dur_s: float = len(bars) * timeframe
bars_dur = Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is blank?\n'
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
result = None
return None
# raise NoData(
# f'{fqme}\n'
# f'end_dt:{end_dt}\n'
# )
else:
dur_s: float = len(bars) * timeframe
bars_dur = Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds()
if dur_s < dt_dur_s:
log.warning(
'History frame is shorter then expected?\n'
f'start_dt: {start_dt}\n'
f'end_dt: {end_dt}\n'
f'duration: {dt_dur_s}\n'
f'frame duration seconds: {dur_s}\n'
f'dur diff: {dt_duration - bars_dur}\n'
)
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
# NOTE: we used to try to get a minimal
# set of bars by recursing but this ran
# into possible infinite query loops
# when logic in the `Client.bars()` dt
# diffing went bad. So instead for now
# we just return the
# shorter-then-expected history with
# a warning.
# TODO: in the future it prolly makes
# the most send to do venue operating
# hours lookup and
# timestamp-in-operating-range set
# checking to know for sure if we can
# safely and quickly ignore non-uniform history
# frame timestamp gaps..
# end_dt -= dt_duration
# continue
# await tractor.pause()
first_dt = from_timestamp(
bars[0].date.timestamp())

View File

@ -70,12 +70,18 @@ _symbol_info_translation: dict[str, str] = {
def get_config() -> dict[str, Any]:
'''
Load our section from `piker/brokers.toml`.
conf, path = config.load()
section = conf.get('kraken')
if section is None:
log.warning(f'No config section found for kraken in {path}')
'''
conf, path = config.load(
conf_name='brokers',
touch_if_dne=True,
)
if (section := conf.get('kraken')) is None:
log.warning(
f'No config section found for kraken in {path}'
)
return {}
return section

View File

@ -26,6 +26,7 @@ from contextlib import asynccontextmanager as acm
from datetime import datetime
from operator import itemgetter
import itertools
from pprint import pformat
import time
from typing import (
Callable,
@ -697,7 +698,12 @@ async def open_trade_dialog(
# sanity check all the mkt infos
for fqme, flume in feed.flumes.items():
mkt: MktPair = symcache.mktmaps.get(fqme) or mkt_by_fqme[fqme]
assert mkt == flume.mkt
if mkt != flume.mkt:
diff: tuple = mkt - flume.mkt
log.warning(
'MktPair sig mismatch?\n'
f'{pformat(diff)}'
)
get_cost: Callable = getattr(
brokermod,

View File

@ -14,49 +14,45 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Actor-runtime service orchestration machinery.
'''
Actor runtime primtives and (distributed) service APIs for,
"""
from __future__ import annotations
- daemon-service mgmt: `_daemon` (i.e. low-level spawn and supervise machinery
for sub-actors like `brokerd`, `emsd`, datad`, etc.)
from ._mngr import Services
from ._registry import ( # noqa
_tractor_kwargs,
_default_reg_addr,
_default_registry_host,
_default_registry_port,
open_registry,
find_service,
check_for_service,
- service-actor supervision (via `trio` tasks) API: `._mngr`
- discovery interface (via light wrapping around `tractor`'s built-in
prot): `._registry`
- `docker` cntr SC supervision for use with `trio`: `_ahab`
- wrappers for marketstore and elasticsearch dbs
=> TODO: maybe to (re)move elsewhere?
'''
from ._mngr import Services as Services
from ._registry import (
_tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr,
_default_registry_host as _default_registry_host,
_default_registry_port as _default_registry_port,
open_registry as open_registry,
find_service as find_service,
check_for_service as check_for_service,
)
from ._daemon import ( # noqa
maybe_spawn_daemon,
spawn_emsd,
maybe_open_emsd,
from ._daemon import (
maybe_spawn_daemon as maybe_spawn_daemon,
spawn_emsd as spawn_emsd,
maybe_open_emsd as maybe_open_emsd,
)
from ._actor_runtime import (
open_piker_runtime,
maybe_open_pikerd,
open_pikerd,
get_runtime_vars,
open_piker_runtime as open_piker_runtime,
maybe_open_pikerd as maybe_open_pikerd,
open_pikerd as open_pikerd,
get_runtime_vars as get_runtime_vars,
)
from ..brokers._daemon import (
spawn_brokerd,
maybe_spawn_brokerd,
spawn_brokerd as spawn_brokerd,
maybe_spawn_brokerd as maybe_spawn_brokerd,
)
__all__ = [
'check_for_service',
'Services',
'maybe_spawn_daemon',
'spawn_brokerd',
'maybe_spawn_brokerd',
'spawn_emsd',
'maybe_open_emsd',
'open_piker_runtime',
'maybe_open_pikerd',
'open_pikerd',
'get_runtime_vars',
]

View File

@ -100,7 +100,7 @@ async def open_piker_runtime(
or [_default_reg_addr]
)
if ems := tractor_kwargs.get('enable_modules'):
if ems := tractor_kwargs.pop('enable_modules', None):
# import pdbp; pdbp.set_trace()
enable_modules.extend(ems)
@ -175,14 +175,20 @@ async def open_pikerd(
alive underling services (see below).
'''
# NOTE: for the root daemon we always enable the root
# mod set and we `list.extend()` it into wtv the
# caller requested.
# TODO: make this mod set more strict?
# -[ ] eventually we should be able to avoid
# having the root have more then permissions to spawn other
# specialized daemons I think?
ems: list[str] = kwargs.setdefault('enable_modules', [])
ems.extend(_root_modules)
async with (
open_piker_runtime(
name=_root_dname,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
loglevel=loglevel,
debug_mode=debug_mode,
registry_addrs=registry_addrs,

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Supervisor for ``docker`` with included async and SC wrapping
to ensure a cancellable container lifetime system.
Supervisor for ``docker`` with included async and SC wrapping to
ensure a cancellable container lifetime system.
'''
from __future__ import annotations

View File

@ -27,6 +27,12 @@ from typing import (
import trio
from trio_typing import TaskStatus
import tractor
from tractor import (
current_actor,
ContextCancelled,
Context,
Portal,
)
from ._util import (
log, # sub-sys logger
@ -38,6 +44,8 @@ from ._util import (
# library.
# - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
class Services:
actor_n: tractor._supervise.ActorNursery
@ -47,7 +55,7 @@ class Services:
str,
tuple[
trio.CancelScope,
tractor.Portal,
Portal,
trio.Event,
]
] = {}
@ -57,12 +65,12 @@ class Services:
async def start_service_task(
self,
name: str,
portal: tractor.Portal,
portal: Portal,
target: Callable,
allow_overruns: bool = False,
**ctx_kwargs,
) -> (trio.CancelScope, tractor.Context):
) -> (trio.CancelScope, Context):
'''
Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown.
@ -101,13 +109,30 @@ class Services:
# wait on any context's return value
# and any final portal result from the
# sub-actor.
ctx_res = await ctx.result()
ctx_res: Any = await ctx.result()
# NOTE: blocks indefinitely until cancelled
# either by error from the target context
# function or by being cancelled here by the
# surrounding cancel scope.
return (await portal.result(), ctx_res)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
finally:
await portal.cancel_actor()

View File

@ -27,6 +27,7 @@ from typing import (
)
import tractor
from tractor import Portal
from ._util import (
log, # sub-sys logger
@ -140,7 +141,11 @@ async def find_service(
first_only: bool = True,
) -> tractor.Portal | None:
) -> (
Portal
| list[Portal]
| None
):
reg_addrs: list[tuple[str, int]]
async with open_registry(
@ -153,6 +158,9 @@ async def find_service(
),
) as reg_addrs:
log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None
# attach to existing daemon by name if possible
async with tractor.find_actor(
service_name,

View File

@ -24,8 +24,13 @@ from __future__ import annotations
# AsyncExitStack,
# )
from pathlib import Path
from math import copysign
import time
from types import ModuleType
from typing import (
Any,
TYPE_CHECKING,
)
import polars as pl
import numpy as np
@ -42,15 +47,17 @@ from piker.data import (
ShmArray,
)
from piker import tsp
from . import (
log,
)
from piker.data._formatters import BGM
from . import log
from . import (
__tsdbs__,
open_storage_client,
StorageClient,
)
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
store = typer.Typer()
@ -203,10 +210,12 @@ def anal(
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(shm_df)
) = tsp.dedupe(
shm_df,
period=period,
)
write_edits: bool = True
if (
@ -217,7 +226,6 @@ def anal(
)
):
await tractor.pause()
await client.write_ohlcv(
fqme,
ohlcv=deduped,
@ -229,10 +237,122 @@ def anal(
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
assert not null_segs
trio.run(main)
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
if prev_r.is_empty():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
rect_gap: float = BGM*3/8
opn: float = row['open'][0]
ro: tuple[float, float] = (
# dt_end_t,
iend + rect_gap + 1,
opn,
)
cls: float = prev_r['close'][0]
lc: tuple[float, float] = (
# dt_start_t,
istart - rect_gap, # + 1 ,
cls,
)
color: str = 'dad_blue'
diff: float = cls - opn
sgn: float = copysign(1, diff)
color: str = {
-1: 'buy_green',
1: 'sell_red',
}[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
)
aid: int = await actl.add_rect(**rect_kwargs)
assert aid
aids[aid] = rect_kwargs
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids
@store.command()
def ldshm(
fqme: str,
@ -249,7 +369,6 @@ def ldshm(
async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
actl: AnnotCtl
mod: ModuleType
@ -274,111 +393,89 @@ def ldshm(
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
period_s: float = float(times[-1] - times[-2])
if period_s < 1.:
d1: float = float(times[-1] - times[-2])
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
# over-write back to shm?
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(shm_df)
period_s: float = float(max(d1, d2, med))
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
needs_correction: bool = (
not gaps.is_empty()
or null_segs
# TODO: call null-seg fixer somehow?
if null_segs:
await tractor.pause()
# async with (
# trio.open_nursery() as tn,
# mod.open_history_client(
# mkt,
# ) as (get_hist, config),
# ):
# nulls_detected: trio.Event = await tn.start(partial(
# tsp.maybe_fill_null_segments,
# shm=shm,
# timeframe=timeframe,
# get_hist=get_hist,
# sampler_stream=sampler_stream,
# mkt=mkt,
# ))
# over-write back to shm?
wdts: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
wdts,
deduped,
diff,
) = tsp.dedupe(
shm_df,
period=period_s,
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if needs_correction:
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
)
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# TODO: by default we always want to mark these up
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
gap_thresh=1,
)
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == iend - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
# async with actl.open_rect(
# ) as aid:
aid: int = await actl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
# write to parquet file?
if (
write_parquet
):
# write to fs
start = time.time()
# TODO: find the disjoint set of step gaps from
# venue (closure) set!
# -[ ] do a set diff by checking for the unique
# gap set only in the step_gaps?
if (
not venue_gaps.is_empty()
or (
period_s < 60
and not step_gaps.is_empty()
)
):
# write repaired ts to parquet-file?
if write_parquet:
start: float = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
@ -390,7 +487,7 @@ def ldshm(
)
# read back from fs
start = time.time()
start: float = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
@ -419,17 +516,32 @@ def ldshm(
update_first=False, # don't update ._first
)
await tractor.pause()
assert diff
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await markup_gaps(
fqme,
period_s,
actl,
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
await tractor.pause()
assert aids
else:
# allow interaction even when no ts problems.
await tractor.pause()
assert not diff
# assert not diff
if df is None:
log.error(f'No matching shm buffers for {fqme} ?')
if shm_df is None:
log.error(
f'No matching shm buffers for {fqme} ?'
)
trio.run(main)

View File

@ -18,24 +18,12 @@
Toolz for debug, profile and trace of the distributed runtime :surfer:
'''
from .debug import (
open_crash_handler,
from tractor.devx import (
open_crash_handler as open_crash_handler,
)
from .profile import (
Profiler,
pg_profile_enabled,
ms_slower_then,
timeit,
Profiler as Profiler,
pg_profile_enabled as pg_profile_enabled,
ms_slower_then as ms_slower_then,
timeit as timeit,
)
# TODO: other mods to include?
# - DROP .trionics, already moved into tractor
# - move in `piker.calc`
__all__: list[str] = [
'open_crash_handler',
'pg_profile_enabled',
'ms_slower_then',
'Profiler',
'timeit',
]

View File

@ -1,40 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Debugger wrappers for `pdbp` as used by `tractor`.
'''
from contextlib import contextmanager as cm
import pdbp
# TODO: better naming and what additionals?
# - optional runtime plugging?
# - detection for sync vs. async code?
# - specialized REPL entry when in distributed mode?
@cm
def open_crash_handler():
'''
Super basic crash handler using `pdbp` debugger.
'''
try:
yield
except BaseException:
pdbp.xpm()
raise

View File

@ -51,10 +51,11 @@ from pendulum import (
import numpy as np
import polars as pl
from ..accounting import (
from piker.brokers import NoData
from piker.accounting import (
MktPair,
)
from ..data._util import (
from piker.data._util import (
log,
)
from ..data._sharedmem import (
@ -302,16 +303,28 @@ async def maybe_fill_null_segments(
gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward
gap[ohlc_fields] = shm._array[istart]['close']
cls: float = shm._array[istart]['close']
# TODO: how can we mark this range as being a gap tho?
# -[ ] maybe pg finally supports nulls in ndarray to
# show empty space somehow?
# -[ ] we could put a special value in the vlm or
# another col/field to denote?
gap[ohlc_fields] = cls
start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange(
start=start_t,
stop=start_t + t_diff,
step=timeframe,
)
# TODO: reimpl using the new `.ui._remote_ctl` ctx
# ideally using some kinda decent
# tractory-reverse-lookup-connnection from some other
# `Context` type thingy?
await sampler_stream.send({
'broadcast_all': {
@ -332,11 +345,11 @@ async def maybe_fill_null_segments(
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# await tractor.pause()
async def start_backfill(
get_hist,
frame_types: dict[str, Duration] | None,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -381,7 +394,6 @@ async def start_backfill(
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to
@ -415,6 +427,28 @@ async def start_backfill(
end_dt=last_start_dt,
)
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
)
continue
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
@ -871,7 +905,7 @@ async def tsdb_backfill(
):
log.info(
f'`{mod}` history client returned backfill config:\n'
f'{config}\n'
f'{pformat(config)}\n'
)
dt_eps: list[DateTime, DateTime] = []
@ -943,6 +977,7 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
frame_types=config.get('frame_types', None),
mod=mod,
mkt=mkt,
shm=shm,

View File

@ -21,15 +21,16 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types.
'''
from __future__ import annotations
from collections import UserList
from pprint import (
pformat,
saferepr,
)
from typing import Any
from msgspec import (
msgpack,
Struct,
Struct as _Struct,
structs,
)
@ -62,7 +63,7 @@ class DiffDump(UserList):
class Struct(
Struct,
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
@ -72,9 +73,27 @@ class Struct(
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
@ -90,16 +109,72 @@ class Struct(
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as `@properties`!
# added as type-defined `@property` methods!
sin_props: dict = {}
for fi in structs.fields(self):
key: str = fi.name
sin_props[key] = asdict[key]
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
return sin_props
def pformat(self) -> str:
return f'Struct({pformat(self.to_dict())})'
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(
self,

View File

@ -50,15 +50,16 @@ from ._display import DisplayState
from ._interaction import ChartView
from ._editors import SelectRect
from ._chart import ChartPlotWidget
from ._dataviz import Viz
log = get_logger(__name__)
# NOTE: this is set by the `._display.graphics_update_loop()` once
# all chart widgets / Viz per flume have been initialized allowing
# for remote annotation (control) of any chart-actor's mkt feed by
# fqme lookup Bo
_dss: dict[str, DisplayState] | None = None
# NOTE: this is UPDATED by the `._display.graphics_update_loop()`
# once all chart widgets / Viz per flume have been initialized
# allowing for remote annotation (control) of any chart-actor's mkt
# feed by fqme lookup Bo
_dss: dict[str, DisplayState] = {}
# stash each and every client connection so that they can all
# be cancelled on shutdown/error.
@ -96,7 +97,7 @@ async def serve_rc_annots(
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'cmd': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
@ -110,14 +111,6 @@ async def serve_rc_annots(
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
@ -143,7 +136,8 @@ async def serve_rc_annots(
await annot_req_stream.send(aid)
case {
'rm_annot': int(aid),
'cmd': 'remove',
'aid': int(aid),
}:
# NOTE: this is normally entered on
# a client's annotation de-alloc normally
@ -156,19 +150,29 @@ async def serve_rc_annots(
await annot_req_stream.send(aid)
case {
'cmd': 'redraw',
'fqme': fqme,
'render': int(aid),
'viz_name': str(viz_name),
'timeframe': timeframe,
# TODO: maybe more fields?
# 'render': int(aid),
# 'viz_name': str(viz_name),
}:
# NOTE: old match from the 60s display loop task
# | {
# 'backfilling': (str(viz_name), timeframe),
# }:
ds: DisplayState = _dss[viz_name]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
ds: DisplayState = _dss[fqme]
viz: Viz = {
60: ds.hist_viz,
1: ds.viz,
}[timeframe]
log.warning(
f'Forcing VIZ REDRAW:\n'
f'fqme: {fqme}\n'
f'timeframe: {timeframe}\n'
)
viz.reset_graphics()
case _:
log.error(
@ -227,6 +231,18 @@ class AnnotCtl(Struct):
# ids to their equivalent IPC msg-streams.
_ipcs: dict[int, MsgStream] = {}
def _get_ipc(
self,
fqme: str,
) -> MsgStream:
ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
return ipc
async def add_rect(
self,
fqme: str,
@ -246,16 +262,10 @@ class AnnotCtl(Struct):
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
ipc: MsgStream = self._get_ipc(fqme)
await ipc.send({
'fqme': fqme,
'annot': 'SelectRect',
'cmd': 'SelectRect',
'timeframe': timeframe,
# 'meth': str(meth),
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
@ -288,7 +298,8 @@ class AnnotCtl(Struct):
'''
ipc: MsgStream = self._ipcs[aid]
await ipc.send({
'rm_annot': aid,
'cmd': 'remove',
'aid': aid,
})
removed: bool = await ipc.receive()
return removed
@ -307,6 +318,19 @@ class AnnotCtl(Struct):
finally:
await self.remove(aid)
async def redraw(
self,
fqme: str,
timeframe: float,
) -> None:
await self._get_ipc(fqme).send({
'cmd': 'redraw',
'fqme': fqme,
# 'render': int(aid),
# 'viz_name': str(viz_name),
'timeframe': timeframe,
})
# TODO: do we even need this?
# async def modify(
# self,

View File

@ -358,7 +358,7 @@ class OrderMode:
send_msg: bool = True,
order: Order | None = None,
) -> Dialog:
) -> Dialog | None:
'''
Send execution order to EMS return a level line to
represent the order on a chart.
@ -378,6 +378,16 @@ class OrderMode:
'oid': oid,
})
if order.price <= 0:
log.error(
'*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form
# like `ib_insync.contracts.Contract.__repr__()`
f'{order}\n'
)
self.cancel_orders([order.oid])
return None
lines = self.lines_from_order(
order,
show_markers=True,
@ -663,7 +673,7 @@ class OrderMode:
self,
msg: Status,
) -> Dialog:
) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded.
order = msg.req
@ -694,12 +704,15 @@ class OrderMode:
fqsn=fqme,
info={},
)
dialog = self.submit_order(
maybe_dialog: Dialog | None = self.submit_order(
send_msg=False,
order=order,
)
assert self.dialogs[oid] == dialog
return dialog
if maybe_dialog is None:
return None
assert self.dialogs[oid] == maybe_dialog
return maybe_dialog
@asynccontextmanager
@ -1079,8 +1092,25 @@ async def process_trade_msg(
)
):
msg.req = order
dialog = mode.load_unknown_dialog_from_msg(msg)
mode.on_submit(oid)
dialog: (
Dialog
# NOTE: on an invalid order submission (eg.
# price <=0) the downstream APIs may return
# a null.
| None
) = mode.load_unknown_dialog_from_msg(msg)
# cancel any invalid pre-existing order!
if dialog is None:
log.warning(
'Order was ignored/invalid?\n'
f'{order}'
)
# if valid, display the order line the same as if
# it was submitted during this UI session.
else:
mode.on_submit(oid)
case Status(resp='error'):