Compare commits
14 Commits
9be29a707d
...
3d03781810
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | 3d03781810 | |
Tyler Goodlet | 83d1f117a8 | |
Tyler Goodlet | e4ce79f720 | |
Tyler Goodlet | 264246d89b | |
Tyler Goodlet | 7c96c9fafe | |
Tyler Goodlet | 52b349fe79 | |
Tyler Goodlet | 6959429af8 | |
Tyler Goodlet | 05f874001a | |
Tyler Goodlet | fc216d37de | |
Tyler Goodlet | 03e429abf8 | |
Tyler Goodlet | 7ae7cc829f | |
Tyler Goodlet | b23d44e21a | |
Tyler Goodlet | 2669db785c | |
Tyler Goodlet | d3e7b5cd0e |
|
@ -201,6 +201,7 @@ class FutesPair(Pair):
|
|||
match contype:
|
||||
case (
|
||||
'CURRENT_QUARTER'
|
||||
| 'CURRENT_QUARTER DELIVERING'
|
||||
| 'NEXT_QUARTER' # su madre binance..
|
||||
):
|
||||
pair, _, expiry = symbol.partition('_')
|
||||
|
@ -220,6 +221,10 @@ class FutesPair(Pair):
|
|||
case ['DEFI']:
|
||||
return 'PERP'
|
||||
|
||||
# wow, just wow you binance guys suck..
|
||||
if self.status == 'PENDING_TRADING':
|
||||
return 'PENDING'
|
||||
|
||||
# XXX: yeah no clue then..
|
||||
raise ValueError(
|
||||
f'Bad .expiry token match: {contype} for {symbol}'
|
||||
|
@ -237,6 +242,7 @@ class FutesPair(Pair):
|
|||
|
||||
case (
|
||||
'CURRENT_QUARTER'
|
||||
| 'CURRENT_QUARTER DELIVERING'
|
||||
| 'NEXT_QUARTER' # su madre binance..
|
||||
):
|
||||
_, _, expiry = symbol.partition('_')
|
||||
|
@ -249,7 +255,10 @@ class FutesPair(Pair):
|
|||
return f'{margin}M'
|
||||
|
||||
match subtype:
|
||||
case ['DEFI']:
|
||||
case (
|
||||
['DEFI']
|
||||
| ['USDC']
|
||||
):
|
||||
return f'{subtype[0]}'
|
||||
|
||||
# XXX: yeah no clue then..
|
||||
|
|
|
@ -482,7 +482,7 @@ def search(
|
|||
):
|
||||
return await func()
|
||||
|
||||
from cornerboi._debug import open_crash_handler
|
||||
from piker.toolz import open_crash_handler
|
||||
with open_crash_handler():
|
||||
quotes = trio.run(
|
||||
main,
|
||||
|
@ -506,9 +506,11 @@ def search(
|
|||
@click.option('--delete', '-d', flag_value=True, help='Delete section')
|
||||
@click.pass_obj
|
||||
def brokercfg(config, section, value, delete):
|
||||
"""If invoked with no arguments, open an editor to edit broker configs file
|
||||
or get / update an individual section.
|
||||
"""
|
||||
'''
|
||||
If invoked with no arguments, open an editor to edit broker
|
||||
configs file or get / update an individual section.
|
||||
|
||||
'''
|
||||
from .. import config
|
||||
|
||||
if section:
|
||||
|
|
|
@ -434,11 +434,8 @@ async def get_bars(
|
|||
# current impl) to detect a cancel case.
|
||||
# timeout=timeout,
|
||||
)
|
||||
|
||||
# not enough bars signal, likely due to venue
|
||||
# operational gaps.
|
||||
# too_little: bool = False
|
||||
if end_dt:
|
||||
# usually either a request during a venue closure
|
||||
# or into a large (weekend) closure gap.
|
||||
if not bars:
|
||||
# no data returned?
|
||||
log.warning(
|
||||
|
@ -447,14 +444,13 @@ async def get_bars(
|
|||
f'end_dt: {end_dt}\n'
|
||||
f'duration: {dt_duration}\n'
|
||||
)
|
||||
# NOTE: REQUIRED to pass back value..
|
||||
result = None
|
||||
return None
|
||||
# raise NoData(
|
||||
# f'{fqme}\n'
|
||||
# f'end_dt:{end_dt}\n'
|
||||
# )
|
||||
|
||||
else:
|
||||
# not enough bars signal, likely due to venue
|
||||
# operational gaps.
|
||||
if end_dt:
|
||||
dur_s: float = len(bars) * timeframe
|
||||
bars_dur = Duration(seconds=dur_s)
|
||||
dt_dur_s: float = dt_duration.in_seconds()
|
||||
|
|
|
@ -70,12 +70,18 @@ _symbol_info_translation: dict[str, str] = {
|
|||
|
||||
|
||||
def get_config() -> dict[str, Any]:
|
||||
'''
|
||||
Load our section from `piker/brokers.toml`.
|
||||
|
||||
conf, path = config.load()
|
||||
section = conf.get('kraken')
|
||||
|
||||
if section is None:
|
||||
log.warning(f'No config section found for kraken in {path}')
|
||||
'''
|
||||
conf, path = config.load(
|
||||
conf_name='brokers',
|
||||
touch_if_dne=True,
|
||||
)
|
||||
if (section := conf.get('kraken')) is None:
|
||||
log.warning(
|
||||
f'No config section found for kraken in {path}'
|
||||
)
|
||||
return {}
|
||||
|
||||
return section
|
||||
|
|
|
@ -26,6 +26,7 @@ from contextlib import asynccontextmanager as acm
|
|||
from datetime import datetime
|
||||
from operator import itemgetter
|
||||
import itertools
|
||||
from pprint import pformat
|
||||
import time
|
||||
from typing import (
|
||||
Callable,
|
||||
|
@ -697,7 +698,12 @@ async def open_trade_dialog(
|
|||
# sanity check all the mkt infos
|
||||
for fqme, flume in feed.flumes.items():
|
||||
mkt: MktPair = symcache.mktmaps.get(fqme) or mkt_by_fqme[fqme]
|
||||
assert mkt == flume.mkt
|
||||
if mkt != flume.mkt:
|
||||
diff: tuple = mkt - flume.mkt
|
||||
log.warning(
|
||||
'MktPair sig mismatch?\n'
|
||||
f'{pformat(diff)}'
|
||||
)
|
||||
|
||||
get_cost: Callable = getattr(
|
||||
brokermod,
|
||||
|
|
|
@ -14,49 +14,45 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
Actor-runtime service orchestration machinery.
|
||||
'''
|
||||
Actor runtime primtives and (distributed) service APIs for,
|
||||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
- daemon-service mgmt: `_daemon` (i.e. low-level spawn and supervise machinery
|
||||
for sub-actors like `brokerd`, `emsd`, datad`, etc.)
|
||||
|
||||
from ._mngr import Services
|
||||
from ._registry import ( # noqa
|
||||
_tractor_kwargs,
|
||||
_default_reg_addr,
|
||||
_default_registry_host,
|
||||
_default_registry_port,
|
||||
open_registry,
|
||||
find_service,
|
||||
check_for_service,
|
||||
- service-actor supervision (via `trio` tasks) API: `._mngr`
|
||||
|
||||
- discovery interface (via light wrapping around `tractor`'s built-in
|
||||
prot): `._registry`
|
||||
|
||||
- `docker` cntr SC supervision for use with `trio`: `_ahab`
|
||||
- wrappers for marketstore and elasticsearch dbs
|
||||
=> TODO: maybe to (re)move elsewhere?
|
||||
|
||||
'''
|
||||
from ._mngr import Services as Services
|
||||
from ._registry import (
|
||||
_tractor_kwargs as _tractor_kwargs,
|
||||
_default_reg_addr as _default_reg_addr,
|
||||
_default_registry_host as _default_registry_host,
|
||||
_default_registry_port as _default_registry_port,
|
||||
|
||||
open_registry as open_registry,
|
||||
find_service as find_service,
|
||||
check_for_service as check_for_service,
|
||||
)
|
||||
from ._daemon import ( # noqa
|
||||
maybe_spawn_daemon,
|
||||
spawn_emsd,
|
||||
maybe_open_emsd,
|
||||
from ._daemon import (
|
||||
maybe_spawn_daemon as maybe_spawn_daemon,
|
||||
spawn_emsd as spawn_emsd,
|
||||
maybe_open_emsd as maybe_open_emsd,
|
||||
)
|
||||
from ._actor_runtime import (
|
||||
open_piker_runtime,
|
||||
maybe_open_pikerd,
|
||||
open_pikerd,
|
||||
get_runtime_vars,
|
||||
open_piker_runtime as open_piker_runtime,
|
||||
maybe_open_pikerd as maybe_open_pikerd,
|
||||
open_pikerd as open_pikerd,
|
||||
get_runtime_vars as get_runtime_vars,
|
||||
)
|
||||
from ..brokers._daemon import (
|
||||
spawn_brokerd,
|
||||
maybe_spawn_brokerd,
|
||||
spawn_brokerd as spawn_brokerd,
|
||||
maybe_spawn_brokerd as maybe_spawn_brokerd,
|
||||
)
|
||||
|
||||
|
||||
__all__ = [
|
||||
'check_for_service',
|
||||
'Services',
|
||||
'maybe_spawn_daemon',
|
||||
'spawn_brokerd',
|
||||
'maybe_spawn_brokerd',
|
||||
'spawn_emsd',
|
||||
'maybe_open_emsd',
|
||||
'open_piker_runtime',
|
||||
'maybe_open_pikerd',
|
||||
'open_pikerd',
|
||||
'get_runtime_vars',
|
||||
]
|
||||
|
|
|
@ -100,7 +100,7 @@ async def open_piker_runtime(
|
|||
or [_default_reg_addr]
|
||||
)
|
||||
|
||||
if ems := tractor_kwargs.get('enable_modules'):
|
||||
if ems := tractor_kwargs.pop('enable_modules', None):
|
||||
# import pdbp; pdbp.set_trace()
|
||||
enable_modules.extend(ems)
|
||||
|
||||
|
@ -175,14 +175,20 @@ async def open_pikerd(
|
|||
alive underling services (see below).
|
||||
|
||||
'''
|
||||
# NOTE: for the root daemon we always enable the root
|
||||
# mod set and we `list.extend()` it into wtv the
|
||||
# caller requested.
|
||||
# TODO: make this mod set more strict?
|
||||
# -[ ] eventually we should be able to avoid
|
||||
# having the root have more then permissions to spawn other
|
||||
# specialized daemons I think?
|
||||
ems: list[str] = kwargs.setdefault('enable_modules', [])
|
||||
ems.extend(_root_modules)
|
||||
|
||||
async with (
|
||||
open_piker_runtime(
|
||||
|
||||
name=_root_dname,
|
||||
# TODO: eventually we should be able to avoid
|
||||
# having the root have more then permissions to
|
||||
# spawn other specialized daemons I think?
|
||||
enable_modules=_root_modules,
|
||||
loglevel=loglevel,
|
||||
debug_mode=debug_mode,
|
||||
registry_addrs=registry_addrs,
|
||||
|
|
|
@ -15,8 +15,8 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Supervisor for ``docker`` with included async and SC wrapping
|
||||
to ensure a cancellable container lifetime system.
|
||||
Supervisor for ``docker`` with included async and SC wrapping to
|
||||
ensure a cancellable container lifetime system.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
|
|
@ -27,6 +27,12 @@ from typing import (
|
|||
import trio
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import (
|
||||
current_actor,
|
||||
ContextCancelled,
|
||||
Context,
|
||||
Portal,
|
||||
)
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
@ -38,6 +44,8 @@ from ._util import (
|
|||
# library.
|
||||
# - wrap a "remote api" wherein you can get a method proxy
|
||||
# to the pikerd actor for starting services remotely!
|
||||
# - prolly rename this to ActorServicesNursery since it spawns
|
||||
# new actors and supervises them to completion?
|
||||
class Services:
|
||||
|
||||
actor_n: tractor._supervise.ActorNursery
|
||||
|
@ -47,7 +55,7 @@ class Services:
|
|||
str,
|
||||
tuple[
|
||||
trio.CancelScope,
|
||||
tractor.Portal,
|
||||
Portal,
|
||||
trio.Event,
|
||||
]
|
||||
] = {}
|
||||
|
@ -57,12 +65,12 @@ class Services:
|
|||
async def start_service_task(
|
||||
self,
|
||||
name: str,
|
||||
portal: tractor.Portal,
|
||||
portal: Portal,
|
||||
target: Callable,
|
||||
allow_overruns: bool = False,
|
||||
**ctx_kwargs,
|
||||
|
||||
) -> (trio.CancelScope, tractor.Context):
|
||||
) -> (trio.CancelScope, Context):
|
||||
'''
|
||||
Open a context in a service sub-actor, add to a stack
|
||||
that gets unwound at ``pikerd`` teardown.
|
||||
|
@ -101,13 +109,30 @@ class Services:
|
|||
# wait on any context's return value
|
||||
# and any final portal result from the
|
||||
# sub-actor.
|
||||
ctx_res = await ctx.result()
|
||||
ctx_res: Any = await ctx.result()
|
||||
|
||||
# NOTE: blocks indefinitely until cancelled
|
||||
# either by error from the target context
|
||||
# function or by being cancelled here by the
|
||||
# surrounding cancel scope.
|
||||
return (await portal.result(), ctx_res)
|
||||
except ContextCancelled as ctxe:
|
||||
canceller: tuple[str, str] = ctxe.canceller
|
||||
our_uid: tuple[str, str] = current_actor().uid
|
||||
if (
|
||||
canceller != portal.channel.uid
|
||||
and
|
||||
canceller != our_uid
|
||||
):
|
||||
log.cancel(
|
||||
f'Actor-service {name} was remotely cancelled?\n'
|
||||
f'remote canceller: {canceller}\n'
|
||||
f'Keeping {our_uid} alive, ignoring sub-actor cancel..\n'
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
|
||||
finally:
|
||||
await portal.cancel_actor()
|
||||
|
|
|
@ -27,6 +27,7 @@ from typing import (
|
|||
)
|
||||
|
||||
import tractor
|
||||
from tractor import Portal
|
||||
|
||||
from ._util import (
|
||||
log, # sub-sys logger
|
||||
|
@ -140,7 +141,11 @@ async def find_service(
|
|||
|
||||
first_only: bool = True,
|
||||
|
||||
) -> tractor.Portal | None:
|
||||
) -> (
|
||||
Portal
|
||||
| list[Portal]
|
||||
| None
|
||||
):
|
||||
|
||||
reg_addrs: list[tuple[str, int]]
|
||||
async with open_registry(
|
||||
|
@ -153,6 +158,9 @@ async def find_service(
|
|||
),
|
||||
) as reg_addrs:
|
||||
log.info(f'Scanning for service `{service_name}`')
|
||||
|
||||
maybe_portals: list[Portal] | Portal | None
|
||||
|
||||
# attach to existing daemon by name if possible
|
||||
async with tractor.find_actor(
|
||||
service_name,
|
||||
|
|
|
@ -24,8 +24,13 @@ from __future__ import annotations
|
|||
# AsyncExitStack,
|
||||
# )
|
||||
from pathlib import Path
|
||||
from math import copysign
|
||||
import time
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
Any,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import polars as pl
|
||||
import numpy as np
|
||||
|
@ -42,15 +47,17 @@ from piker.data import (
|
|||
ShmArray,
|
||||
)
|
||||
from piker import tsp
|
||||
from . import (
|
||||
log,
|
||||
)
|
||||
from piker.data._formatters import BGM
|
||||
from . import log
|
||||
from . import (
|
||||
__tsdbs__,
|
||||
open_storage_client,
|
||||
StorageClient,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from piker.ui._remote_ctl import AnnotCtl
|
||||
|
||||
|
||||
store = typer.Typer()
|
||||
|
||||
|
@ -203,10 +210,12 @@ def anal(
|
|||
deduped: pl.DataFrame # deduplicated dts
|
||||
(
|
||||
df,
|
||||
gaps,
|
||||
deduped,
|
||||
diff,
|
||||
) = tsp.dedupe(shm_df)
|
||||
) = tsp.dedupe(
|
||||
shm_df,
|
||||
period=period,
|
||||
)
|
||||
|
||||
write_edits: bool = True
|
||||
if (
|
||||
|
@ -217,7 +226,6 @@ def anal(
|
|||
)
|
||||
):
|
||||
await tractor.pause()
|
||||
|
||||
await client.write_ohlcv(
|
||||
fqme,
|
||||
ohlcv=deduped,
|
||||
|
@ -229,10 +237,122 @@ def anal(
|
|||
# is there something more minimal but nearly as
|
||||
# functional as ipython?
|
||||
await tractor.pause()
|
||||
assert not null_segs
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
async def markup_gaps(
|
||||
fqme: str,
|
||||
timeframe: float,
|
||||
actl: AnnotCtl,
|
||||
wdts: pl.DataFrame,
|
||||
gaps: pl.DataFrame,
|
||||
|
||||
) -> dict[int, dict]:
|
||||
'''
|
||||
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
|
||||
with rectangles.
|
||||
|
||||
'''
|
||||
aids: dict[int] = {}
|
||||
for i in range(gaps.height):
|
||||
|
||||
row: pl.DataFrame = gaps[i]
|
||||
|
||||
# the gap's RIGHT-most bar's OPEN value
|
||||
# at that time (sample) step.
|
||||
iend: int = row['index'][0]
|
||||
# dt: datetime = row['dt'][0]
|
||||
# dt_prev: datetime = row['dt_prev'][0]
|
||||
# dt_end_t: float = dt.timestamp()
|
||||
|
||||
|
||||
# TODO: can we eventually remove this
|
||||
# once we figure out why the epoch cols
|
||||
# don't match?
|
||||
# TODO: FIX HOW/WHY these aren't matching
|
||||
# and are instead off by 4hours (EST
|
||||
# vs. UTC?!?!)
|
||||
# end_t: float = row['time']
|
||||
# assert (
|
||||
# dt.timestamp()
|
||||
# ==
|
||||
# end_t
|
||||
# )
|
||||
|
||||
# the gap's LEFT-most bar's CLOSE value
|
||||
# at that time (sample) step.
|
||||
prev_r: pl.DataFrame = wdts.filter(
|
||||
pl.col('index') == iend - 1
|
||||
)
|
||||
# XXX: probably a gap in the (newly sorted or de-duplicated)
|
||||
# dt-df, so we might need to re-index first..
|
||||
if prev_r.is_empty():
|
||||
await tractor.pause()
|
||||
|
||||
istart: int = prev_r['index'][0]
|
||||
# dt_start_t: float = dt_prev.timestamp()
|
||||
|
||||
# start_t: float = prev_r['time']
|
||||
# assert (
|
||||
# dt_start_t
|
||||
# ==
|
||||
# start_t
|
||||
# )
|
||||
|
||||
# TODO: implement px-col width measure
|
||||
# and ensure at least as many px-cols
|
||||
# shown per rect as configured by user.
|
||||
# gap_w: float = abs((iend - istart))
|
||||
# if gap_w < 6:
|
||||
# margin: float = 6
|
||||
# iend += margin
|
||||
# istart -= margin
|
||||
|
||||
rect_gap: float = BGM*3/8
|
||||
opn: float = row['open'][0]
|
||||
ro: tuple[float, float] = (
|
||||
# dt_end_t,
|
||||
iend + rect_gap + 1,
|
||||
opn,
|
||||
)
|
||||
cls: float = prev_r['close'][0]
|
||||
lc: tuple[float, float] = (
|
||||
# dt_start_t,
|
||||
istart - rect_gap, # + 1 ,
|
||||
cls,
|
||||
)
|
||||
|
||||
color: str = 'dad_blue'
|
||||
diff: float = cls - opn
|
||||
sgn: float = copysign(1, diff)
|
||||
color: str = {
|
||||
-1: 'buy_green',
|
||||
1: 'sell_red',
|
||||
}[sgn]
|
||||
|
||||
rect_kwargs: dict[str, Any] = dict(
|
||||
fqme=fqme,
|
||||
timeframe=timeframe,
|
||||
start_pos=lc,
|
||||
end_pos=ro,
|
||||
color=color,
|
||||
)
|
||||
|
||||
aid: int = await actl.add_rect(**rect_kwargs)
|
||||
assert aid
|
||||
aids[aid] = rect_kwargs
|
||||
|
||||
# tell chart to redraw all its
|
||||
# graphics view layers Bo
|
||||
await actl.redraw(
|
||||
fqme=fqme,
|
||||
timeframe=timeframe,
|
||||
)
|
||||
return aids
|
||||
|
||||
|
||||
@store.command()
|
||||
def ldshm(
|
||||
fqme: str,
|
||||
|
@ -249,7 +369,6 @@ def ldshm(
|
|||
async def main():
|
||||
from piker.ui._remote_ctl import (
|
||||
open_annot_ctl,
|
||||
AnnotCtl,
|
||||
)
|
||||
actl: AnnotCtl
|
||||
mod: ModuleType
|
||||
|
@ -274,111 +393,89 @@ def ldshm(
|
|||
shm_df,
|
||||
) in tsp.iter_dfs_from_shms(fqme):
|
||||
|
||||
# compute ohlc properties for naming
|
||||
times: np.ndarray = shm.array['time']
|
||||
period_s: float = float(times[-1] - times[-2])
|
||||
if period_s < 1.:
|
||||
d1: float = float(times[-1] - times[-2])
|
||||
d2: float = float(times[-2] - times[-3])
|
||||
med: float = np.median(np.diff(times))
|
||||
if (
|
||||
d1 < 1.
|
||||
and d2 < 1.
|
||||
and med < 1.
|
||||
):
|
||||
raise ValueError(
|
||||
f'Something is wrong with time period for {shm}:\n{times}'
|
||||
)
|
||||
|
||||
# over-write back to shm?
|
||||
df: pl.DataFrame # with dts
|
||||
deduped: pl.DataFrame # deduplicated dts
|
||||
(
|
||||
df,
|
||||
gaps,
|
||||
deduped,
|
||||
diff,
|
||||
) = tsp.dedupe(shm_df)
|
||||
period_s: float = float(max(d1, d2, med))
|
||||
|
||||
null_segs: tuple = tsp.get_null_segs(
|
||||
frame=shm.array,
|
||||
period=period_s,
|
||||
)
|
||||
|
||||
needs_correction: bool = (
|
||||
not gaps.is_empty()
|
||||
or null_segs
|
||||
)
|
||||
# TODO: maybe only optionally enter this depending
|
||||
# on some CLI flags and/or gap detection?
|
||||
if needs_correction:
|
||||
for i in range(gaps.height):
|
||||
row: pl.DataFrame = gaps[i]
|
||||
# TODO: call null-seg fixer somehow?
|
||||
if null_segs:
|
||||
await tractor.pause()
|
||||
# async with (
|
||||
# trio.open_nursery() as tn,
|
||||
# mod.open_history_client(
|
||||
# mkt,
|
||||
# ) as (get_hist, config),
|
||||
# ):
|
||||
# nulls_detected: trio.Event = await tn.start(partial(
|
||||
# tsp.maybe_fill_null_segments,
|
||||
|
||||
# TODO: can we eventually remove this
|
||||
# once we figure out why the epoch cols
|
||||
# don't match?
|
||||
iend: int = row['index'][0]
|
||||
# dt: datetime = row['dt'][0]
|
||||
# dt_prev: datetime = row['dt_prev'][0]
|
||||
# shm=shm,
|
||||
# timeframe=timeframe,
|
||||
# get_hist=get_hist,
|
||||
# sampler_stream=sampler_stream,
|
||||
# mkt=mkt,
|
||||
# ))
|
||||
|
||||
# the gap's right-most bar's OPEN value
|
||||
# at that time (sample) step.
|
||||
# dt_end_t: float = dt.timestamp()
|
||||
|
||||
# TODO: FIX HOW/WHY these aren't matching
|
||||
# and are instead off by 4hours (EST
|
||||
# vs. UTC?!?!)
|
||||
# end_t: float = row['time']
|
||||
# assert (
|
||||
# dt.timestamp()
|
||||
# ==
|
||||
# end_t
|
||||
# )
|
||||
|
||||
# the gap's left-most bar's CLOSE value
|
||||
# at that time (sample) step.
|
||||
prev_r: pl.DataFrame = df.filter(
|
||||
pl.col('index') == iend - 1
|
||||
)
|
||||
istart: int = prev_r['index'][0]
|
||||
# dt_start_t: float = dt_prev.timestamp()
|
||||
|
||||
# start_t: float = prev_r['time']
|
||||
# assert (
|
||||
# dt_start_t
|
||||
# ==
|
||||
# start_t
|
||||
# )
|
||||
|
||||
# TODO: implement px-col width measure
|
||||
# and ensure at least as many px-cols
|
||||
# shown per rect as configured by user.
|
||||
gap_w: float = abs((iend - istart))
|
||||
if gap_w < 6:
|
||||
margin: float = 6
|
||||
iend += margin
|
||||
istart -= margin
|
||||
|
||||
ro: tuple[float, float] = (
|
||||
# dt_end_t,
|
||||
iend,
|
||||
row['open'][0],
|
||||
)
|
||||
lc: tuple[float, float] = (
|
||||
# dt_start_t,
|
||||
istart,
|
||||
prev_r['close'][0],
|
||||
# over-write back to shm?
|
||||
wdts: pl.DataFrame # with dts
|
||||
deduped: pl.DataFrame # deduplicated dts
|
||||
(
|
||||
wdts,
|
||||
deduped,
|
||||
diff,
|
||||
) = tsp.dedupe(
|
||||
shm_df,
|
||||
period=period_s,
|
||||
)
|
||||
|
||||
# async with actl.open_rect(
|
||||
# ) as aid:
|
||||
aid: int = await actl.add_rect(
|
||||
fqme=fqme,
|
||||
timeframe=period_s,
|
||||
start_pos=lc,
|
||||
end_pos=ro,
|
||||
# detect gaps from in expected (uniform OHLC) sample period
|
||||
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
|
||||
deduped,
|
||||
expect_period=period_s,
|
||||
)
|
||||
assert aid
|
||||
|
||||
# write to parquet file?
|
||||
# TODO: by default we always want to mark these up
|
||||
# with rects showing up/down gaps Bo
|
||||
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
|
||||
deduped,
|
||||
expect_period=period_s,
|
||||
|
||||
# TODO: actually pull the exact duration
|
||||
# expected for each venue operational period?
|
||||
gap_dt_unit='days',
|
||||
gap_thresh=1,
|
||||
)
|
||||
|
||||
# TODO: find the disjoint set of step gaps from
|
||||
# venue (closure) set!
|
||||
# -[ ] do a set diff by checking for the unique
|
||||
# gap set only in the step_gaps?
|
||||
if (
|
||||
write_parquet
|
||||
not venue_gaps.is_empty()
|
||||
or (
|
||||
period_s < 60
|
||||
and not step_gaps.is_empty()
|
||||
)
|
||||
):
|
||||
# write to fs
|
||||
start = time.time()
|
||||
# write repaired ts to parquet-file?
|
||||
if write_parquet:
|
||||
start: float = time.time()
|
||||
path: Path = await client.write_ohlcv(
|
||||
fqme,
|
||||
ohlcv=deduped,
|
||||
|
@ -390,7 +487,7 @@ def ldshm(
|
|||
)
|
||||
|
||||
# read back from fs
|
||||
start = time.time()
|
||||
start: float = time.time()
|
||||
read_df: pl.DataFrame = pl.read_parquet(path)
|
||||
read_delay: float = round(
|
||||
time.time() - start,
|
||||
|
@ -419,17 +516,32 @@ def ldshm(
|
|||
update_first=False, # don't update ._first
|
||||
)
|
||||
|
||||
do_markup_gaps: bool = True
|
||||
if do_markup_gaps:
|
||||
new_df: pl.DataFrame = tsp.np2pl(new)
|
||||
aids: dict = await markup_gaps(
|
||||
fqme,
|
||||
period_s,
|
||||
actl,
|
||||
new_df,
|
||||
step_gaps,
|
||||
)
|
||||
|
||||
# last chance manual overwrites in REPL
|
||||
await tractor.pause()
|
||||
assert diff
|
||||
assert aids
|
||||
|
||||
else:
|
||||
# allow interaction even when no ts problems.
|
||||
await tractor.pause()
|
||||
assert not diff
|
||||
# assert not diff
|
||||
|
||||
|
||||
if df is None:
|
||||
log.error(f'No matching shm buffers for {fqme} ?')
|
||||
if shm_df is None:
|
||||
log.error(
|
||||
f'No matching shm buffers for {fqme} ?'
|
||||
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
|
|
@ -18,24 +18,12 @@
|
|||
Toolz for debug, profile and trace of the distributed runtime :surfer:
|
||||
|
||||
'''
|
||||
from .debug import (
|
||||
open_crash_handler,
|
||||
from tractor.devx import (
|
||||
open_crash_handler as open_crash_handler,
|
||||
)
|
||||
from .profile import (
|
||||
Profiler,
|
||||
pg_profile_enabled,
|
||||
ms_slower_then,
|
||||
timeit,
|
||||
Profiler as Profiler,
|
||||
pg_profile_enabled as pg_profile_enabled,
|
||||
ms_slower_then as ms_slower_then,
|
||||
timeit as timeit,
|
||||
)
|
||||
|
||||
# TODO: other mods to include?
|
||||
# - DROP .trionics, already moved into tractor
|
||||
# - move in `piker.calc`
|
||||
|
||||
__all__: list[str] = [
|
||||
'open_crash_handler',
|
||||
'pg_profile_enabled',
|
||||
'ms_slower_then',
|
||||
'Profiler',
|
||||
'timeit',
|
||||
]
|
||||
|
|
|
@ -1,40 +0,0 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Debugger wrappers for `pdbp` as used by `tractor`.
|
||||
|
||||
'''
|
||||
from contextlib import contextmanager as cm
|
||||
|
||||
import pdbp
|
||||
|
||||
|
||||
# TODO: better naming and what additionals?
|
||||
# - optional runtime plugging?
|
||||
# - detection for sync vs. async code?
|
||||
# - specialized REPL entry when in distributed mode?
|
||||
@cm
|
||||
def open_crash_handler():
|
||||
'''
|
||||
Super basic crash handler using `pdbp` debugger.
|
||||
|
||||
'''
|
||||
try:
|
||||
yield
|
||||
except BaseException:
|
||||
pdbp.xpm()
|
||||
raise
|
|
@ -51,10 +51,11 @@ from pendulum import (
|
|||
import numpy as np
|
||||
import polars as pl
|
||||
|
||||
from ..accounting import (
|
||||
from piker.brokers import NoData
|
||||
from piker.accounting import (
|
||||
MktPair,
|
||||
)
|
||||
from ..data._util import (
|
||||
from piker.data._util import (
|
||||
log,
|
||||
)
|
||||
from ..data._sharedmem import (
|
||||
|
@ -302,16 +303,28 @@ async def maybe_fill_null_segments(
|
|||
gap: np.ndarray = shm._array[istart:istop]
|
||||
|
||||
# copy the oldest OHLC samples forward
|
||||
gap[ohlc_fields] = shm._array[istart]['close']
|
||||
cls: float = shm._array[istart]['close']
|
||||
|
||||
# TODO: how can we mark this range as being a gap tho?
|
||||
# -[ ] maybe pg finally supports nulls in ndarray to
|
||||
# show empty space somehow?
|
||||
# -[ ] we could put a special value in the vlm or
|
||||
# another col/field to denote?
|
||||
gap[ohlc_fields] = cls
|
||||
|
||||
start_t: float = shm._array[istart]['time']
|
||||
t_diff: float = (istop - istart)*timeframe
|
||||
|
||||
gap['time'] = np.arange(
|
||||
start=start_t,
|
||||
stop=start_t + t_diff,
|
||||
step=timeframe,
|
||||
)
|
||||
|
||||
# TODO: reimpl using the new `.ui._remote_ctl` ctx
|
||||
# ideally using some kinda decent
|
||||
# tractory-reverse-lookup-connnection from some other
|
||||
# `Context` type thingy?
|
||||
await sampler_stream.send({
|
||||
'broadcast_all': {
|
||||
|
||||
|
@ -332,11 +345,11 @@ async def maybe_fill_null_segments(
|
|||
# parallel possible no matter the backend?
|
||||
# -[ ] fill algo: do queries in alternating "latest, then
|
||||
# earliest, then latest.. etc?"
|
||||
# await tractor.pause()
|
||||
|
||||
|
||||
async def start_backfill(
|
||||
get_hist,
|
||||
frame_types: dict[str, Duration] | None,
|
||||
mod: ModuleType,
|
||||
mkt: MktPair,
|
||||
shm: ShmArray,
|
||||
|
@ -381,7 +394,6 @@ async def start_backfill(
|
|||
60: {'years': 6},
|
||||
}
|
||||
period_duration: int = periods[timeframe]
|
||||
|
||||
update_start_on_prepend = True
|
||||
|
||||
# NOTE: manually set the "latest" datetime which we intend to
|
||||
|
@ -415,6 +427,28 @@ async def start_backfill(
|
|||
end_dt=last_start_dt,
|
||||
)
|
||||
|
||||
except NoData as _daterr:
|
||||
# 3 cases:
|
||||
# - frame in the middle of a legit venue gap
|
||||
# - history actually began at the `last_start_dt`
|
||||
# - some other unknown error (ib blocking the
|
||||
# history bc they don't want you seeing how they
|
||||
# cucked all the tinas..)
|
||||
if dur := frame_types.get(timeframe):
|
||||
# decrement by a frame's worth of duration and
|
||||
# retry a few times.
|
||||
last_start_dt.subtract(
|
||||
seconds=dur.total_seconds()
|
||||
)
|
||||
log.warning(
|
||||
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
|
||||
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||
'bf_until <- last_start_dt:\n'
|
||||
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||
f'Decrementing `end_dt` by {dur} and retry..\n'
|
||||
)
|
||||
continue
|
||||
|
||||
# broker says there never was or is no more history to pull
|
||||
except DataUnavailable:
|
||||
log.warning(
|
||||
|
@ -871,7 +905,7 @@ async def tsdb_backfill(
|
|||
):
|
||||
log.info(
|
||||
f'`{mod}` history client returned backfill config:\n'
|
||||
f'{config}\n'
|
||||
f'{pformat(config)}\n'
|
||||
)
|
||||
|
||||
dt_eps: list[DateTime, DateTime] = []
|
||||
|
@ -943,6 +977,7 @@ async def tsdb_backfill(
|
|||
partial(
|
||||
start_backfill,
|
||||
get_hist=get_hist,
|
||||
frame_types=config.get('frame_types', None),
|
||||
mod=mod,
|
||||
mkt=mkt,
|
||||
shm=shm,
|
||||
|
|
|
@ -21,15 +21,16 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
|
|||
types.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from collections import UserList
|
||||
from pprint import (
|
||||
pformat,
|
||||
saferepr,
|
||||
)
|
||||
from typing import Any
|
||||
|
||||
from msgspec import (
|
||||
msgpack,
|
||||
Struct,
|
||||
Struct as _Struct,
|
||||
structs,
|
||||
)
|
||||
|
||||
|
@ -62,7 +63,7 @@ class DiffDump(UserList):
|
|||
|
||||
|
||||
class Struct(
|
||||
Struct,
|
||||
_Struct,
|
||||
|
||||
# https://jcristharif.com/msgspec/structs.html#tagged-unions
|
||||
# tag='pikerstruct',
|
||||
|
@ -72,9 +73,27 @@ class Struct(
|
|||
A "human friendlier" (aka repl buddy) struct subtype.
|
||||
|
||||
'''
|
||||
def _sin_props(self) -> Iterator[
|
||||
tuple[
|
||||
structs.FieldIinfo,
|
||||
str,
|
||||
Any,
|
||||
]
|
||||
]:
|
||||
'''
|
||||
Iterate over all non-@property fields of this struct.
|
||||
|
||||
'''
|
||||
fi: structs.FieldInfo
|
||||
for fi in structs.fields(self):
|
||||
key: str = fi.name
|
||||
val: Any = getattr(self, key)
|
||||
yield fi, key, val
|
||||
|
||||
def to_dict(
|
||||
self,
|
||||
include_non_members: bool = True,
|
||||
|
||||
) -> dict:
|
||||
'''
|
||||
Like it sounds.. direct delegation to:
|
||||
|
@ -90,16 +109,72 @@ class Struct(
|
|||
|
||||
# only return a dict of the struct members
|
||||
# which were provided as input, NOT anything
|
||||
# added as `@properties`!
|
||||
# added as type-defined `@property` methods!
|
||||
sin_props: dict = {}
|
||||
for fi in structs.fields(self):
|
||||
key: str = fi.name
|
||||
sin_props[key] = asdict[key]
|
||||
fi: structs.FieldInfo
|
||||
for fi, k, v in self._sin_props():
|
||||
sin_props[k] = asdict[k]
|
||||
|
||||
return sin_props
|
||||
|
||||
def pformat(self) -> str:
|
||||
return f'Struct({pformat(self.to_dict())})'
|
||||
def pformat(
|
||||
self,
|
||||
field_indent: int = 2,
|
||||
indent: int = 0,
|
||||
|
||||
) -> str:
|
||||
'''
|
||||
Recursion-safe `pprint.pformat()` style formatting of
|
||||
a `msgspec.Struct` for sane reading by a human using a REPL.
|
||||
|
||||
'''
|
||||
# global whitespace indent
|
||||
ws: str = ' '*indent
|
||||
|
||||
# field whitespace indent
|
||||
field_ws: str = ' '*(field_indent + indent)
|
||||
|
||||
# qtn: str = ws + self.__class__.__qualname__
|
||||
qtn: str = self.__class__.__qualname__
|
||||
|
||||
obj_str: str = '' # accumulator
|
||||
fi: structs.FieldInfo
|
||||
k: str
|
||||
v: Any
|
||||
for fi, k, v in self._sin_props():
|
||||
|
||||
# TODO: how can we prefer `Literal['option1', 'option2,
|
||||
# ..]` over .__name__ == `Literal` but still get only the
|
||||
# latter for simple types like `str | int | None` etc..?
|
||||
ft: type = fi.type
|
||||
typ_name: str = getattr(ft, '__name__', str(ft))
|
||||
|
||||
# recurse to get sub-struct's `.pformat()` output Bo
|
||||
if isinstance(v, Struct):
|
||||
val_str: str = v.pformat(
|
||||
indent=field_indent + indent,
|
||||
field_indent=indent + field_indent,
|
||||
)
|
||||
|
||||
else: # the `pprint` recursion-safe format:
|
||||
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
|
||||
val_str: str = saferepr(v)
|
||||
|
||||
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
|
||||
|
||||
return (
|
||||
f'{qtn}(\n'
|
||||
f'{obj_str}'
|
||||
f'{ws})'
|
||||
)
|
||||
|
||||
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
|
||||
# inside a known tty?
|
||||
# def __repr__(self) -> str:
|
||||
# ...
|
||||
|
||||
# __str__ = __repr__ = pformat
|
||||
__repr__ = pformat
|
||||
|
||||
def copy(
|
||||
self,
|
||||
|
|
|
@ -50,15 +50,16 @@ from ._display import DisplayState
|
|||
from ._interaction import ChartView
|
||||
from ._editors import SelectRect
|
||||
from ._chart import ChartPlotWidget
|
||||
from ._dataviz import Viz
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
# NOTE: this is set by the `._display.graphics_update_loop()` once
|
||||
# all chart widgets / Viz per flume have been initialized allowing
|
||||
# for remote annotation (control) of any chart-actor's mkt feed by
|
||||
# fqme lookup Bo
|
||||
_dss: dict[str, DisplayState] | None = None
|
||||
# NOTE: this is UPDATED by the `._display.graphics_update_loop()`
|
||||
# once all chart widgets / Viz per flume have been initialized
|
||||
# allowing for remote annotation (control) of any chart-actor's mkt
|
||||
# feed by fqme lookup Bo
|
||||
_dss: dict[str, DisplayState] = {}
|
||||
|
||||
# stash each and every client connection so that they can all
|
||||
# be cancelled on shutdown/error.
|
||||
|
@ -96,7 +97,7 @@ async def serve_rc_annots(
|
|||
async for msg in annot_req_stream:
|
||||
match msg:
|
||||
case {
|
||||
'annot': 'SelectRect',
|
||||
'cmd': 'SelectRect',
|
||||
'fqme': fqme,
|
||||
'timeframe': timeframe,
|
||||
'meth': str(meth),
|
||||
|
@ -110,14 +111,6 @@ async def serve_rc_annots(
|
|||
}[timeframe]
|
||||
cv: ChartView = chart.cv
|
||||
|
||||
# sanity
|
||||
if timeframe == 60:
|
||||
assert (
|
||||
chart.linked.godwidget.hist_linked.chart.view
|
||||
is
|
||||
cv
|
||||
)
|
||||
|
||||
# annot type lookup from cmd
|
||||
rect = SelectRect(
|
||||
viewbox=cv,
|
||||
|
@ -143,7 +136,8 @@ async def serve_rc_annots(
|
|||
await annot_req_stream.send(aid)
|
||||
|
||||
case {
|
||||
'rm_annot': int(aid),
|
||||
'cmd': 'remove',
|
||||
'aid': int(aid),
|
||||
}:
|
||||
# NOTE: this is normally entered on
|
||||
# a client's annotation de-alloc normally
|
||||
|
@ -156,19 +150,29 @@ async def serve_rc_annots(
|
|||
await annot_req_stream.send(aid)
|
||||
|
||||
case {
|
||||
'cmd': 'redraw',
|
||||
'fqme': fqme,
|
||||
'render': int(aid),
|
||||
'viz_name': str(viz_name),
|
||||
'timeframe': timeframe,
|
||||
|
||||
# TODO: maybe more fields?
|
||||
# 'render': int(aid),
|
||||
# 'viz_name': str(viz_name),
|
||||
}:
|
||||
# NOTE: old match from the 60s display loop task
|
||||
# | {
|
||||
# 'backfilling': (str(viz_name), timeframe),
|
||||
# }:
|
||||
ds: DisplayState = _dss[viz_name]
|
||||
chart: ChartPlotWidget = {
|
||||
60: ds.hist_chart,
|
||||
1: ds.chart,
|
||||
ds: DisplayState = _dss[fqme]
|
||||
viz: Viz = {
|
||||
60: ds.hist_viz,
|
||||
1: ds.viz,
|
||||
}[timeframe]
|
||||
log.warning(
|
||||
f'Forcing VIZ REDRAW:\n'
|
||||
f'fqme: {fqme}\n'
|
||||
f'timeframe: {timeframe}\n'
|
||||
)
|
||||
viz.reset_graphics()
|
||||
|
||||
case _:
|
||||
log.error(
|
||||
|
@ -227,6 +231,18 @@ class AnnotCtl(Struct):
|
|||
# ids to their equivalent IPC msg-streams.
|
||||
_ipcs: dict[int, MsgStream] = {}
|
||||
|
||||
def _get_ipc(
|
||||
self,
|
||||
fqme: str,
|
||||
) -> MsgStream:
|
||||
ipc: MsgStream = self.fqme2ipc.get(fqme)
|
||||
if ipc is None:
|
||||
raise SymbolNotFound(
|
||||
'No chart (actor) seems to have mkt feed loaded?\n'
|
||||
f'{fqme}'
|
||||
)
|
||||
return ipc
|
||||
|
||||
async def add_rect(
|
||||
self,
|
||||
fqme: str,
|
||||
|
@ -246,16 +262,10 @@ class AnnotCtl(Struct):
|
|||
the instances `id(obj)` from the remote UI actor.
|
||||
|
||||
'''
|
||||
ipc: MsgStream = self.fqme2ipc.get(fqme)
|
||||
if ipc is None:
|
||||
raise SymbolNotFound(
|
||||
'No chart (actor) seems to have mkt feed loaded?\n'
|
||||
f'{fqme}'
|
||||
)
|
||||
|
||||
ipc: MsgStream = self._get_ipc(fqme)
|
||||
await ipc.send({
|
||||
'fqme': fqme,
|
||||
'annot': 'SelectRect',
|
||||
'cmd': 'SelectRect',
|
||||
'timeframe': timeframe,
|
||||
# 'meth': str(meth),
|
||||
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
|
||||
|
@ -288,7 +298,8 @@ class AnnotCtl(Struct):
|
|||
'''
|
||||
ipc: MsgStream = self._ipcs[aid]
|
||||
await ipc.send({
|
||||
'rm_annot': aid,
|
||||
'cmd': 'remove',
|
||||
'aid': aid,
|
||||
})
|
||||
removed: bool = await ipc.receive()
|
||||
return removed
|
||||
|
@ -307,6 +318,19 @@ class AnnotCtl(Struct):
|
|||
finally:
|
||||
await self.remove(aid)
|
||||
|
||||
async def redraw(
|
||||
self,
|
||||
fqme: str,
|
||||
timeframe: float,
|
||||
) -> None:
|
||||
await self._get_ipc(fqme).send({
|
||||
'cmd': 'redraw',
|
||||
'fqme': fqme,
|
||||
# 'render': int(aid),
|
||||
# 'viz_name': str(viz_name),
|
||||
'timeframe': timeframe,
|
||||
})
|
||||
|
||||
# TODO: do we even need this?
|
||||
# async def modify(
|
||||
# self,
|
||||
|
|
|
@ -358,7 +358,7 @@ class OrderMode:
|
|||
send_msg: bool = True,
|
||||
order: Order | None = None,
|
||||
|
||||
) -> Dialog:
|
||||
) -> Dialog | None:
|
||||
'''
|
||||
Send execution order to EMS return a level line to
|
||||
represent the order on a chart.
|
||||
|
@ -378,6 +378,16 @@ class OrderMode:
|
|||
'oid': oid,
|
||||
})
|
||||
|
||||
if order.price <= 0:
|
||||
log.error(
|
||||
'*!? Invalid `Order.price <= 0` ?!*\n'
|
||||
# TODO: make this present multi-line in object form
|
||||
# like `ib_insync.contracts.Contract.__repr__()`
|
||||
f'{order}\n'
|
||||
)
|
||||
self.cancel_orders([order.oid])
|
||||
return None
|
||||
|
||||
lines = self.lines_from_order(
|
||||
order,
|
||||
show_markers=True,
|
||||
|
@ -663,7 +673,7 @@ class OrderMode:
|
|||
self,
|
||||
msg: Status,
|
||||
|
||||
) -> Dialog:
|
||||
) -> Dialog | None:
|
||||
# NOTE: the `.order` attr **must** be set with the
|
||||
# equivalent order msg in order to be loaded.
|
||||
order = msg.req
|
||||
|
@ -694,12 +704,15 @@ class OrderMode:
|
|||
fqsn=fqme,
|
||||
info={},
|
||||
)
|
||||
dialog = self.submit_order(
|
||||
maybe_dialog: Dialog | None = self.submit_order(
|
||||
send_msg=False,
|
||||
order=order,
|
||||
)
|
||||
assert self.dialogs[oid] == dialog
|
||||
return dialog
|
||||
if maybe_dialog is None:
|
||||
return None
|
||||
|
||||
assert self.dialogs[oid] == maybe_dialog
|
||||
return maybe_dialog
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
@ -1079,7 +1092,24 @@ async def process_trade_msg(
|
|||
)
|
||||
):
|
||||
msg.req = order
|
||||
dialog = mode.load_unknown_dialog_from_msg(msg)
|
||||
dialog: (
|
||||
Dialog
|
||||
# NOTE: on an invalid order submission (eg.
|
||||
# price <=0) the downstream APIs may return
|
||||
# a null.
|
||||
| None
|
||||
) = mode.load_unknown_dialog_from_msg(msg)
|
||||
|
||||
# cancel any invalid pre-existing order!
|
||||
if dialog is None:
|
||||
log.warning(
|
||||
'Order was ignored/invalid?\n'
|
||||
f'{order}'
|
||||
)
|
||||
|
||||
# if valid, display the order line the same as if
|
||||
# it was submitted during this UI session.
|
||||
else:
|
||||
mode.on_submit(oid)
|
||||
|
||||
case Status(resp='error'):
|
||||
|
|
Loading…
Reference in New Issue