Compare commits

..

No commits in common. "3d03781810707520a82278ea541f80c6178b7074" and "9be29a707d60095aadd72d87851f0644a46a8483" have entirely different histories.

17 changed files with 316 additions and 594 deletions

View File

@ -201,7 +201,6 @@ class FutesPair(Pair):
match contype: match contype:
case ( case (
'CURRENT_QUARTER' 'CURRENT_QUARTER'
| 'CURRENT_QUARTER DELIVERING'
| 'NEXT_QUARTER' # su madre binance.. | 'NEXT_QUARTER' # su madre binance..
): ):
pair, _, expiry = symbol.partition('_') pair, _, expiry = symbol.partition('_')
@ -221,10 +220,6 @@ class FutesPair(Pair):
case ['DEFI']: case ['DEFI']:
return 'PERP' return 'PERP'
# wow, just wow you binance guys suck..
if self.status == 'PENDING_TRADING':
return 'PENDING'
# XXX: yeah no clue then.. # XXX: yeah no clue then..
raise ValueError( raise ValueError(
f'Bad .expiry token match: {contype} for {symbol}' f'Bad .expiry token match: {contype} for {symbol}'
@ -242,7 +237,6 @@ class FutesPair(Pair):
case ( case (
'CURRENT_QUARTER' 'CURRENT_QUARTER'
| 'CURRENT_QUARTER DELIVERING'
| 'NEXT_QUARTER' # su madre binance.. | 'NEXT_QUARTER' # su madre binance..
): ):
_, _, expiry = symbol.partition('_') _, _, expiry = symbol.partition('_')
@ -255,10 +249,7 @@ class FutesPair(Pair):
return f'{margin}M' return f'{margin}M'
match subtype: match subtype:
case ( case ['DEFI']:
['DEFI']
| ['USDC']
):
return f'{subtype[0]}' return f'{subtype[0]}'
# XXX: yeah no clue then.. # XXX: yeah no clue then..

View File

@ -482,7 +482,7 @@ def search(
): ):
return await func() return await func()
from piker.toolz import open_crash_handler from cornerboi._debug import open_crash_handler
with open_crash_handler(): with open_crash_handler():
quotes = trio.run( quotes = trio.run(
main, main,
@ -506,11 +506,9 @@ def search(
@click.option('--delete', '-d', flag_value=True, help='Delete section') @click.option('--delete', '-d', flag_value=True, help='Delete section')
@click.pass_obj @click.pass_obj
def brokercfg(config, section, value, delete): def brokercfg(config, section, value, delete):
''' """If invoked with no arguments, open an editor to edit broker configs file
If invoked with no arguments, open an editor to edit broker or get / update an individual section.
configs file or get / update an individual section. """
'''
from .. import config from .. import config
if section: if section:

View File

@ -434,8 +434,11 @@ async def get_bars(
# current impl) to detect a cancel case. # current impl) to detect a cancel case.
# timeout=timeout, # timeout=timeout,
) )
# usually either a request during a venue closure
# or into a large (weekend) closure gap. # not enough bars signal, likely due to venue
# operational gaps.
# too_little: bool = False
if end_dt:
if not bars: if not bars:
# no data returned? # no data returned?
log.warning( log.warning(
@ -444,13 +447,14 @@ async def get_bars(
f'end_dt: {end_dt}\n' f'end_dt: {end_dt}\n'
f'duration: {dt_duration}\n' f'duration: {dt_duration}\n'
) )
# NOTE: REQUIRED to pass back value..
result = None result = None
return None return None
# raise NoData(
# f'{fqme}\n'
# f'end_dt:{end_dt}\n'
# )
# not enough bars signal, likely due to venue else:
# operational gaps.
if end_dt:
dur_s: float = len(bars) * timeframe dur_s: float = len(bars) * timeframe
bars_dur = Duration(seconds=dur_s) bars_dur = Duration(seconds=dur_s)
dt_dur_s: float = dt_duration.in_seconds() dt_dur_s: float = dt_duration.in_seconds()

View File

@ -70,18 +70,12 @@ _symbol_info_translation: dict[str, str] = {
def get_config() -> dict[str, Any]: def get_config() -> dict[str, Any]:
'''
Load our section from `piker/brokers.toml`.
''' conf, path = config.load()
conf, path = config.load( section = conf.get('kraken')
conf_name='brokers',
touch_if_dne=True, if section is None:
) log.warning(f'No config section found for kraken in {path}')
if (section := conf.get('kraken')) is None:
log.warning(
f'No config section found for kraken in {path}'
)
return {} return {}
return section return section

View File

@ -26,7 +26,6 @@ from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from operator import itemgetter from operator import itemgetter
import itertools import itertools
from pprint import pformat
import time import time
from typing import ( from typing import (
Callable, Callable,
@ -698,12 +697,7 @@ async def open_trade_dialog(
# sanity check all the mkt infos # sanity check all the mkt infos
for fqme, flume in feed.flumes.items(): for fqme, flume in feed.flumes.items():
mkt: MktPair = symcache.mktmaps.get(fqme) or mkt_by_fqme[fqme] mkt: MktPair = symcache.mktmaps.get(fqme) or mkt_by_fqme[fqme]
if mkt != flume.mkt: assert mkt == flume.mkt
diff: tuple = mkt - flume.mkt
log.warning(
'MktPair sig mismatch?\n'
f'{pformat(diff)}'
)
get_cost: Callable = getattr( get_cost: Callable = getattr(
brokermod, brokermod,

View File

@ -14,45 +14,49 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' """
Actor runtime primtives and (distributed) service APIs for, Actor-runtime service orchestration machinery.
- daemon-service mgmt: `_daemon` (i.e. low-level spawn and supervise machinery """
for sub-actors like `brokerd`, `emsd`, datad`, etc.) from __future__ import annotations
- service-actor supervision (via `trio` tasks) API: `._mngr` from ._mngr import Services
from ._registry import ( # noqa
- discovery interface (via light wrapping around `tractor`'s built-in _tractor_kwargs,
prot): `._registry` _default_reg_addr,
_default_registry_host,
- `docker` cntr SC supervision for use with `trio`: `_ahab` _default_registry_port,
- wrappers for marketstore and elasticsearch dbs open_registry,
=> TODO: maybe to (re)move elsewhere? find_service,
check_for_service,
'''
from ._mngr import Services as Services
from ._registry import (
_tractor_kwargs as _tractor_kwargs,
_default_reg_addr as _default_reg_addr,
_default_registry_host as _default_registry_host,
_default_registry_port as _default_registry_port,
open_registry as open_registry,
find_service as find_service,
check_for_service as check_for_service,
) )
from ._daemon import ( from ._daemon import ( # noqa
maybe_spawn_daemon as maybe_spawn_daemon, maybe_spawn_daemon,
spawn_emsd as spawn_emsd, spawn_emsd,
maybe_open_emsd as maybe_open_emsd, maybe_open_emsd,
) )
from ._actor_runtime import ( from ._actor_runtime import (
open_piker_runtime as open_piker_runtime, open_piker_runtime,
maybe_open_pikerd as maybe_open_pikerd, maybe_open_pikerd,
open_pikerd as open_pikerd, open_pikerd,
get_runtime_vars as get_runtime_vars, get_runtime_vars,
) )
from ..brokers._daemon import ( from ..brokers._daemon import (
spawn_brokerd as spawn_brokerd, spawn_brokerd,
maybe_spawn_brokerd as maybe_spawn_brokerd, maybe_spawn_brokerd,
) )
__all__ = [
'check_for_service',
'Services',
'maybe_spawn_daemon',
'spawn_brokerd',
'maybe_spawn_brokerd',
'spawn_emsd',
'maybe_open_emsd',
'open_piker_runtime',
'maybe_open_pikerd',
'open_pikerd',
'get_runtime_vars',
]

View File

@ -100,7 +100,7 @@ async def open_piker_runtime(
or [_default_reg_addr] or [_default_reg_addr]
) )
if ems := tractor_kwargs.pop('enable_modules', None): if ems := tractor_kwargs.get('enable_modules'):
# import pdbp; pdbp.set_trace() # import pdbp; pdbp.set_trace()
enable_modules.extend(ems) enable_modules.extend(ems)
@ -175,20 +175,14 @@ async def open_pikerd(
alive underling services (see below). alive underling services (see below).
''' '''
# NOTE: for the root daemon we always enable the root
# mod set and we `list.extend()` it into wtv the
# caller requested.
# TODO: make this mod set more strict?
# -[ ] eventually we should be able to avoid
# having the root have more then permissions to spawn other
# specialized daemons I think?
ems: list[str] = kwargs.setdefault('enable_modules', [])
ems.extend(_root_modules)
async with ( async with (
open_piker_runtime( open_piker_runtime(
name=_root_dname, name=_root_dname,
# TODO: eventually we should be able to avoid
# having the root have more then permissions to
# spawn other specialized daemons I think?
enable_modules=_root_modules,
loglevel=loglevel, loglevel=loglevel,
debug_mode=debug_mode, debug_mode=debug_mode,
registry_addrs=registry_addrs, registry_addrs=registry_addrs,

View File

@ -15,8 +15,8 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
Supervisor for ``docker`` with included async and SC wrapping to Supervisor for ``docker`` with included async and SC wrapping
ensure a cancellable container lifetime system. to ensure a cancellable container lifetime system.
''' '''
from __future__ import annotations from __future__ import annotations

View File

@ -27,12 +27,6 @@ from typing import (
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import (
current_actor,
ContextCancelled,
Context,
Portal,
)
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -44,8 +38,6 @@ from ._util import (
# library. # library.
# - wrap a "remote api" wherein you can get a method proxy # - wrap a "remote api" wherein you can get a method proxy
# to the pikerd actor for starting services remotely! # to the pikerd actor for starting services remotely!
# - prolly rename this to ActorServicesNursery since it spawns
# new actors and supervises them to completion?
class Services: class Services:
actor_n: tractor._supervise.ActorNursery actor_n: tractor._supervise.ActorNursery
@ -55,7 +47,7 @@ class Services:
str, str,
tuple[ tuple[
trio.CancelScope, trio.CancelScope,
Portal, tractor.Portal,
trio.Event, trio.Event,
] ]
] = {} ] = {}
@ -65,12 +57,12 @@ class Services:
async def start_service_task( async def start_service_task(
self, self,
name: str, name: str,
portal: Portal, portal: tractor.Portal,
target: Callable, target: Callable,
allow_overruns: bool = False, allow_overruns: bool = False,
**ctx_kwargs, **ctx_kwargs,
) -> (trio.CancelScope, Context): ) -> (trio.CancelScope, tractor.Context):
''' '''
Open a context in a service sub-actor, add to a stack Open a context in a service sub-actor, add to a stack
that gets unwound at ``pikerd`` teardown. that gets unwound at ``pikerd`` teardown.
@ -109,30 +101,13 @@ class Services:
# wait on any context's return value # wait on any context's return value
# and any final portal result from the # and any final portal result from the
# sub-actor. # sub-actor.
ctx_res: Any = await ctx.result() ctx_res = await ctx.result()
# NOTE: blocks indefinitely until cancelled # NOTE: blocks indefinitely until cancelled
# either by error from the target context # either by error from the target context
# function or by being cancelled here by the # function or by being cancelled here by the
# surrounding cancel scope. # surrounding cancel scope.
return (await portal.result(), ctx_res) return (await portal.result(), ctx_res)
except ContextCancelled as ctxe:
canceller: tuple[str, str] = ctxe.canceller
our_uid: tuple[str, str] = current_actor().uid
if (
canceller != portal.channel.uid
and
canceller != our_uid
):
log.cancel(
f'Actor-service {name} was remotely cancelled?\n'
f'remote canceller: {canceller}\n'
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
)
else:
raise
finally: finally:
await portal.cancel_actor() await portal.cancel_actor()

View File

@ -27,7 +27,6 @@ from typing import (
) )
import tractor import tractor
from tractor import Portal
from ._util import ( from ._util import (
log, # sub-sys logger log, # sub-sys logger
@ -141,11 +140,7 @@ async def find_service(
first_only: bool = True, first_only: bool = True,
) -> ( ) -> tractor.Portal | None:
Portal
| list[Portal]
| None
):
reg_addrs: list[tuple[str, int]] reg_addrs: list[tuple[str, int]]
async with open_registry( async with open_registry(
@ -158,9 +153,6 @@ async def find_service(
), ),
) as reg_addrs: ) as reg_addrs:
log.info(f'Scanning for service `{service_name}`') log.info(f'Scanning for service `{service_name}`')
maybe_portals: list[Portal] | Portal | None
# attach to existing daemon by name if possible # attach to existing daemon by name if possible
async with tractor.find_actor( async with tractor.find_actor(
service_name, service_name,

View File

@ -24,13 +24,8 @@ from __future__ import annotations
# AsyncExitStack, # AsyncExitStack,
# ) # )
from pathlib import Path from pathlib import Path
from math import copysign
import time import time
from types import ModuleType from types import ModuleType
from typing import (
Any,
TYPE_CHECKING,
)
import polars as pl import polars as pl
import numpy as np import numpy as np
@ -47,17 +42,15 @@ from piker.data import (
ShmArray, ShmArray,
) )
from piker import tsp from piker import tsp
from piker.data._formatters import BGM from . import (
from . import log log,
)
from . import ( from . import (
__tsdbs__, __tsdbs__,
open_storage_client, open_storage_client,
StorageClient, StorageClient,
) )
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
store = typer.Typer() store = typer.Typer()
@ -210,12 +203,10 @@ def anal(
deduped: pl.DataFrame # deduplicated dts deduped: pl.DataFrame # deduplicated dts
( (
df, df,
gaps,
deduped, deduped,
diff, diff,
) = tsp.dedupe( ) = tsp.dedupe(shm_df)
shm_df,
period=period,
)
write_edits: bool = True write_edits: bool = True
if ( if (
@ -226,6 +217,7 @@ def anal(
) )
): ):
await tractor.pause() await tractor.pause()
await client.write_ohlcv( await client.write_ohlcv(
fqme, fqme,
ohlcv=deduped, ohlcv=deduped,
@ -237,122 +229,10 @@ def anal(
# is there something more minimal but nearly as # is there something more minimal but nearly as
# functional as ipython? # functional as ipython?
await tractor.pause() await tractor.pause()
assert not null_segs
trio.run(main) trio.run(main)
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
if prev_r.is_empty():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
rect_gap: float = BGM*3/8
opn: float = row['open'][0]
ro: tuple[float, float] = (
# dt_end_t,
iend + rect_gap + 1,
opn,
)
cls: float = prev_r['close'][0]
lc: tuple[float, float] = (
# dt_start_t,
istart - rect_gap, # + 1 ,
cls,
)
color: str = 'dad_blue'
diff: float = cls - opn
sgn: float = copysign(1, diff)
color: str = {
-1: 'buy_green',
1: 'sell_red',
}[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
)
aid: int = await actl.add_rect(**rect_kwargs)
assert aid
aids[aid] = rect_kwargs
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
@ -369,6 +249,7 @@ def ldshm(
async def main(): async def main():
from piker.ui._remote_ctl import ( from piker.ui._remote_ctl import (
open_annot_ctl, open_annot_ctl,
AnnotCtl,
) )
actl: AnnotCtl actl: AnnotCtl
mod: ModuleType mod: ModuleType
@ -393,89 +274,111 @@ def ldshm(
shm_df, shm_df,
) in tsp.iter_dfs_from_shms(fqme): ) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2]) period_s: float = float(times[-1] - times[-2])
d2: float = float(times[-2] - times[-3]) if period_s < 1.:
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
raise ValueError( raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
period_s: float = float(max(d1, d2, med)) # over-write back to shm?
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(shm_df)
null_segs: tuple = tsp.get_null_segs( null_segs: tuple = tsp.get_null_segs(
frame=shm.array, frame=shm.array,
period=period_s, period=period_s,
) )
# TODO: call null-seg fixer somehow? needs_correction: bool = (
if null_segs: not gaps.is_empty()
await tractor.pause() or null_segs
# async with ( )
# trio.open_nursery() as tn, # TODO: maybe only optionally enter this depending
# mod.open_history_client( # on some CLI flags and/or gap detection?
# mkt, if needs_correction:
# ) as (get_hist, config), for i in range(gaps.height):
# ): row: pl.DataFrame = gaps[i]
# nulls_detected: trio.Event = await tn.start(partial(
# tsp.maybe_fill_null_segments,
# shm=shm, # TODO: can we eventually remove this
# timeframe=timeframe, # once we figure out why the epoch cols
# get_hist=get_hist, # don't match?
# sampler_stream=sampler_stream, iend: int = row['index'][0]
# mkt=mkt, # dt: datetime = row['dt'][0]
# )) # dt_prev: datetime = row['dt_prev'][0]
# over-write back to shm? # the gap's right-most bar's OPEN value
wdts: pl.DataFrame # with dts # at that time (sample) step.
deduped: pl.DataFrame # deduplicated dts # dt_end_t: float = dt.timestamp()
(
wdts, # TODO: FIX HOW/WHY these aren't matching
deduped, # and are instead off by 4hours (EST
diff, # vs. UTC?!?!)
) = tsp.dedupe( # end_t: float = row['time']
shm_df, # assert (
period=period_s, # dt.timestamp()
# ==
# end_t
# )
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == iend - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
) )
# detect gaps from in expected (uniform OHLC) sample period # async with actl.open_rect(
step_gaps: pl.DataFrame = tsp.detect_time_gaps( # ) as aid:
deduped, aid: int = await actl.add_rect(
expect_period=period_s, fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
) )
assert aid
# TODO: by default we always want to mark these up # write to parquet file?
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
expect_period=period_s,
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
gap_thresh=1,
)
# TODO: find the disjoint set of step gaps from
# venue (closure) set!
# -[ ] do a set diff by checking for the unique
# gap set only in the step_gaps?
if ( if (
not venue_gaps.is_empty() write_parquet
or (
period_s < 60
and not step_gaps.is_empty()
)
): ):
# write repaired ts to parquet-file? # write to fs
if write_parquet: start = time.time()
start: float = time.time()
path: Path = await client.write_ohlcv( path: Path = await client.write_ohlcv(
fqme, fqme,
ohlcv=deduped, ohlcv=deduped,
@ -487,7 +390,7 @@ def ldshm(
) )
# read back from fs # read back from fs
start: float = time.time() start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path) read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round( read_delay: float = round(
time.time() - start, time.time() - start,
@ -516,32 +419,17 @@ def ldshm(
update_first=False, # don't update ._first update_first=False, # don't update ._first
) )
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await markup_gaps(
fqme,
period_s,
actl,
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
await tractor.pause() await tractor.pause()
assert aids assert diff
else: else:
# allow interaction even when no ts problems. # allow interaction even when no ts problems.
await tractor.pause() await tractor.pause()
# assert not diff assert not diff
if shm_df is None: if df is None:
log.error( log.error(f'No matching shm buffers for {fqme} ?')
f'No matching shm buffers for {fqme} ?'
)
trio.run(main) trio.run(main)

View File

@ -18,12 +18,24 @@
Toolz for debug, profile and trace of the distributed runtime :surfer: Toolz for debug, profile and trace of the distributed runtime :surfer:
''' '''
from tractor.devx import ( from .debug import (
open_crash_handler as open_crash_handler, open_crash_handler,
) )
from .profile import ( from .profile import (
Profiler as Profiler, Profiler,
pg_profile_enabled as pg_profile_enabled, pg_profile_enabled,
ms_slower_then as ms_slower_then, ms_slower_then,
timeit as timeit, timeit,
) )
# TODO: other mods to include?
# - DROP .trionics, already moved into tractor
# - move in `piker.calc`
__all__: list[str] = [
'open_crash_handler',
'pg_profile_enabled',
'ms_slower_then',
'Profiler',
'timeit',
]

View File

@ -0,0 +1,40 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Debugger wrappers for `pdbp` as used by `tractor`.
'''
from contextlib import contextmanager as cm
import pdbp
# TODO: better naming and what additionals?
# - optional runtime plugging?
# - detection for sync vs. async code?
# - specialized REPL entry when in distributed mode?
@cm
def open_crash_handler():
'''
Super basic crash handler using `pdbp` debugger.
'''
try:
yield
except BaseException:
pdbp.xpm()
raise

View File

@ -51,11 +51,10 @@ from pendulum import (
import numpy as np import numpy as np
import polars as pl import polars as pl
from piker.brokers import NoData from ..accounting import (
from piker.accounting import (
MktPair, MktPair,
) )
from piker.data._util import ( from ..data._util import (
log, log,
) )
from ..data._sharedmem import ( from ..data._sharedmem import (
@ -303,28 +302,16 @@ async def maybe_fill_null_segments(
gap: np.ndarray = shm._array[istart:istop] gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward # copy the oldest OHLC samples forward
cls: float = shm._array[istart]['close'] gap[ohlc_fields] = shm._array[istart]['close']
# TODO: how can we mark this range as being a gap tho?
# -[ ] maybe pg finally supports nulls in ndarray to
# show empty space somehow?
# -[ ] we could put a special value in the vlm or
# another col/field to denote?
gap[ohlc_fields] = cls
start_t: float = shm._array[istart]['time'] start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange( gap['time'] = np.arange(
start=start_t, start=start_t,
stop=start_t + t_diff, stop=start_t + t_diff,
step=timeframe, step=timeframe,
) )
# TODO: reimpl using the new `.ui._remote_ctl` ctx
# ideally using some kinda decent
# tractory-reverse-lookup-connnection from some other
# `Context` type thingy?
await sampler_stream.send({ await sampler_stream.send({
'broadcast_all': { 'broadcast_all': {
@ -345,11 +332,11 @@ async def maybe_fill_null_segments(
# parallel possible no matter the backend? # parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then # -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?" # earliest, then latest.. etc?"
# await tractor.pause()
async def start_backfill( async def start_backfill(
get_hist, get_hist,
frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -394,6 +381,7 @@ async def start_backfill(
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
@ -427,28 +415,6 @@ async def start_backfill(
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
)
continue
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable:
log.warning( log.warning(
@ -905,7 +871,7 @@ async def tsdb_backfill(
): ):
log.info( log.info(
f'`{mod}` history client returned backfill config:\n' f'`{mod}` history client returned backfill config:\n'
f'{pformat(config)}\n' f'{config}\n'
) )
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
@ -977,7 +943,6 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,

View File

@ -21,16 +21,15 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types. types.
''' '''
from __future__ import annotations
from collections import UserList from collections import UserList
from pprint import ( from pprint import (
saferepr, pformat,
) )
from typing import Any from typing import Any
from msgspec import ( from msgspec import (
msgpack, msgpack,
Struct as _Struct, Struct,
structs, structs,
) )
@ -63,7 +62,7 @@ class DiffDump(UserList):
class Struct( class Struct(
_Struct, Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions # https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct', # tag='pikerstruct',
@ -73,27 +72,9 @@ class Struct(
A "human friendlier" (aka repl buddy) struct subtype. A "human friendlier" (aka repl buddy) struct subtype.
''' '''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict( def to_dict(
self, self,
include_non_members: bool = True, include_non_members: bool = True,
) -> dict: ) -> dict:
''' '''
Like it sounds.. direct delegation to: Like it sounds.. direct delegation to:
@ -109,72 +90,16 @@ class Struct(
# only return a dict of the struct members # only return a dict of the struct members
# which were provided as input, NOT anything # which were provided as input, NOT anything
# added as type-defined `@property` methods! # added as `@properties`!
sin_props: dict = {} sin_props: dict = {}
fi: structs.FieldInfo for fi in structs.fields(self):
for fi, k, v in self._sin_props(): key: str = fi.name
sin_props[k] = asdict[k] sin_props[key] = asdict[key]
return sin_props return sin_props
def pformat( def pformat(self) -> str:
self, return f'Struct({pformat(self.to_dict())})'
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy( def copy(
self, self,

View File

@ -50,16 +50,15 @@ from ._display import DisplayState
from ._interaction import ChartView from ._interaction import ChartView
from ._editors import SelectRect from ._editors import SelectRect
from ._chart import ChartPlotWidget from ._chart import ChartPlotWidget
from ._dataviz import Viz
log = get_logger(__name__) log = get_logger(__name__)
# NOTE: this is UPDATED by the `._display.graphics_update_loop()` # NOTE: this is set by the `._display.graphics_update_loop()` once
# once all chart widgets / Viz per flume have been initialized # all chart widgets / Viz per flume have been initialized allowing
# allowing for remote annotation (control) of any chart-actor's mkt # for remote annotation (control) of any chart-actor's mkt feed by
# feed by fqme lookup Bo # fqme lookup Bo
_dss: dict[str, DisplayState] = {} _dss: dict[str, DisplayState] | None = None
# stash each and every client connection so that they can all # stash each and every client connection so that they can all
# be cancelled on shutdown/error. # be cancelled on shutdown/error.
@ -97,7 +96,7 @@ async def serve_rc_annots(
async for msg in annot_req_stream: async for msg in annot_req_stream:
match msg: match msg:
case { case {
'cmd': 'SelectRect', 'annot': 'SelectRect',
'fqme': fqme, 'fqme': fqme,
'timeframe': timeframe, 'timeframe': timeframe,
'meth': str(meth), 'meth': str(meth),
@ -111,6 +110,14 @@ async def serve_rc_annots(
}[timeframe] }[timeframe]
cv: ChartView = chart.cv cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd # annot type lookup from cmd
rect = SelectRect( rect = SelectRect(
viewbox=cv, viewbox=cv,
@ -136,8 +143,7 @@ async def serve_rc_annots(
await annot_req_stream.send(aid) await annot_req_stream.send(aid)
case { case {
'cmd': 'remove', 'rm_annot': int(aid),
'aid': int(aid),
}: }:
# NOTE: this is normally entered on # NOTE: this is normally entered on
# a client's annotation de-alloc normally # a client's annotation de-alloc normally
@ -150,29 +156,19 @@ async def serve_rc_annots(
await annot_req_stream.send(aid) await annot_req_stream.send(aid)
case { case {
'cmd': 'redraw',
'fqme': fqme, 'fqme': fqme,
'render': int(aid),
'viz_name': str(viz_name),
'timeframe': timeframe, 'timeframe': timeframe,
# TODO: maybe more fields?
# 'render': int(aid),
# 'viz_name': str(viz_name),
}: }:
# NOTE: old match from the 60s display loop task
# | { # | {
# 'backfilling': (str(viz_name), timeframe), # 'backfilling': (str(viz_name), timeframe),
# }: # }:
ds: DisplayState = _dss[fqme] ds: DisplayState = _dss[viz_name]
viz: Viz = { chart: ChartPlotWidget = {
60: ds.hist_viz, 60: ds.hist_chart,
1: ds.viz, 1: ds.chart,
}[timeframe] }[timeframe]
log.warning(
f'Forcing VIZ REDRAW:\n'
f'fqme: {fqme}\n'
f'timeframe: {timeframe}\n'
)
viz.reset_graphics()
case _: case _:
log.error( log.error(
@ -231,18 +227,6 @@ class AnnotCtl(Struct):
# ids to their equivalent IPC msg-streams. # ids to their equivalent IPC msg-streams.
_ipcs: dict[int, MsgStream] = {} _ipcs: dict[int, MsgStream] = {}
def _get_ipc(
self,
fqme: str,
) -> MsgStream:
ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
return ipc
async def add_rect( async def add_rect(
self, self,
fqme: str, fqme: str,
@ -262,10 +246,16 @@ class AnnotCtl(Struct):
the instances `id(obj)` from the remote UI actor. the instances `id(obj)` from the remote UI actor.
''' '''
ipc: MsgStream = self._get_ipc(fqme) ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
await ipc.send({ await ipc.send({
'fqme': fqme, 'fqme': fqme,
'cmd': 'SelectRect', 'annot': 'SelectRect',
'timeframe': timeframe, 'timeframe': timeframe,
# 'meth': str(meth), # 'meth': str(meth),
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos', 'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
@ -298,8 +288,7 @@ class AnnotCtl(Struct):
''' '''
ipc: MsgStream = self._ipcs[aid] ipc: MsgStream = self._ipcs[aid]
await ipc.send({ await ipc.send({
'cmd': 'remove', 'rm_annot': aid,
'aid': aid,
}) })
removed: bool = await ipc.receive() removed: bool = await ipc.receive()
return removed return removed
@ -318,19 +307,6 @@ class AnnotCtl(Struct):
finally: finally:
await self.remove(aid) await self.remove(aid)
async def redraw(
self,
fqme: str,
timeframe: float,
) -> None:
await self._get_ipc(fqme).send({
'cmd': 'redraw',
'fqme': fqme,
# 'render': int(aid),
# 'viz_name': str(viz_name),
'timeframe': timeframe,
})
# TODO: do we even need this? # TODO: do we even need this?
# async def modify( # async def modify(
# self, # self,

View File

@ -358,7 +358,7 @@ class OrderMode:
send_msg: bool = True, send_msg: bool = True,
order: Order | None = None, order: Order | None = None,
) -> Dialog | None: ) -> Dialog:
''' '''
Send execution order to EMS return a level line to Send execution order to EMS return a level line to
represent the order on a chart. represent the order on a chart.
@ -378,16 +378,6 @@ class OrderMode:
'oid': oid, 'oid': oid,
}) })
if order.price <= 0:
log.error(
'*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form
# like `ib_insync.contracts.Contract.__repr__()`
f'{order}\n'
)
self.cancel_orders([order.oid])
return None
lines = self.lines_from_order( lines = self.lines_from_order(
order, order,
show_markers=True, show_markers=True,
@ -673,7 +663,7 @@ class OrderMode:
self, self,
msg: Status, msg: Status,
) -> Dialog | None: ) -> Dialog:
# NOTE: the `.order` attr **must** be set with the # NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded. # equivalent order msg in order to be loaded.
order = msg.req order = msg.req
@ -704,15 +694,12 @@ class OrderMode:
fqsn=fqme, fqsn=fqme,
info={}, info={},
) )
maybe_dialog: Dialog | None = self.submit_order( dialog = self.submit_order(
send_msg=False, send_msg=False,
order=order, order=order,
) )
if maybe_dialog is None: assert self.dialogs[oid] == dialog
return None return dialog
assert self.dialogs[oid] == maybe_dialog
return maybe_dialog
@asynccontextmanager @asynccontextmanager
@ -1092,24 +1079,7 @@ async def process_trade_msg(
) )
): ):
msg.req = order msg.req = order
dialog: ( dialog = mode.load_unknown_dialog_from_msg(msg)
Dialog
# NOTE: on an invalid order submission (eg.
# price <=0) the downstream APIs may return
# a null.
| None
) = mode.load_unknown_dialog_from_msg(msg)
# cancel any invalid pre-existing order!
if dialog is None:
log.warning(
'Order was ignored/invalid?\n'
f'{order}'
)
# if valid, display the order line the same as if
# it was submitted during this UI session.
else:
mode.on_submit(oid) mode.on_submit(oid)
case Status(resp='error'): case Status(resp='error'):